http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 82b353c..e594e0d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -17,10 +17,17 @@ */ package org.apache.phoenix.mapreduce.index; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM; + import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; import org.apache.commons.cli.CommandLine; @@ -36,11 +43,15 @@ import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; @@ -48,9 +59,13 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.phoenix.compile.PostIndexDDLCompiler; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.hbase.index.util.IndexManagementUtil; +import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.CsvBulkImportUtil; import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder; @@ -62,9 +77,9 @@ import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; @@ -72,6 +87,8 @@ import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + /** * An MR job to populate the index table from the data table. * @@ -85,7 +102,11 @@ public class IndexTool extends Configured implements Tool { private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true, "Data table name (mandatory)"); private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true, - "Index table name(mandatory)"); + "Index table name(not required in case of partial rebuilding)"); + + private static final Option PARTIAL_REBUILD_OPTION = new Option("pr", "partial-rebuild", false, + "To build indexes for a data table from least disabledTimeStamp"); + private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false, "If specified, we avoid the bulk load (optional)"); private static final Option RUN_FOREGROUND_OPTION = @@ -105,6 +126,7 @@ public class IndexTool extends Configured implements Tool { options.addOption(SCHEMA_NAME_OPTION); options.addOption(DATA_TABLE_OPTION); options.addOption(INDEX_TABLE_OPTION); + options.addOption(PARTIAL_REBUILD_OPTION); options.addOption(DIRECT_API_OPTION); options.addOption(RUN_FOREGROUND_OPTION); options.addOption(OUTPUT_PATH_OPTION); @@ -139,18 +161,18 @@ public class IndexTool extends Configured implements Tool { + "parameter"); } - if (!cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) { - throw new IllegalStateException(INDEX_TABLE_OPTION.getLongOpt() + " is a mandatory " - + "parameter"); - } - - if (!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) { - throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " - + "parameter"); - } - - if (!cmdLine.hasOption(DIRECT_API_OPTION.getOpt()) - && cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt())) { + if (!(cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) || cmdLine.hasOption(DIRECT_API_OPTION.getOpt())) + && !cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) { + throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " + "parameter"); + } + + if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) { + throw new IllegalStateException("Index name should not be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt()); + } + + if (!(cmdLine.hasOption(DIRECT_API_OPTION.getOpt())) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt()) + && cmdLine.hasOption(RUN_FOREGROUND_OPTION + .getOpt())) { throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt() + " is applicable only for " + DIRECT_API_OPTION.getLongOpt()); } @@ -167,50 +189,155 @@ public class IndexTool extends Configured implements Tool { formatter.printHelp("help", options); System.exit(exitCode); } + + class JobFactory { + Connection connection; + Configuration configuration; + private Path outputPath; - @Override - public int run(String[] args) throws Exception { - Connection connection = null; - try { - CommandLine cmdLine = null; - try { - cmdLine = parseOptions(args); - } catch (IllegalStateException e) { - printHelpAndExit(e.getMessage(), getOptions()); + public JobFactory(Connection connection, Configuration configuration, Path outputPath) { + this.connection = connection; + this.configuration = configuration; + this.outputPath = outputPath; + + } + + public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild) throws Exception { + if (isPartialBuild) { + return configureJobForPartialBuild(schemaName, dataTable); + } else { + return configureJobForAysncIndex(schemaName, indexTable, dataTable, useDirectApi); } - final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); - final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); - final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); - final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); + } + + private Job configureJobForPartialBuild(String schemaName, String dataTable) throws Exception { final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); - final String qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); - + final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); connection = ConnectionUtil.getInputConnection(configuration); - if (!isValidIndexTable(connection, qDataTable, indexTable)) { - throw new IllegalArgumentException(String.format( - " %s is not an index table for %s ", qIndexTable, qDataTable)); + long minDisableTimestamp = HConstants.LATEST_TIMESTAMP; + PTable indexWithMinDisableTimestamp = null; + + //Get Indexes in building state, minDisabledTimestamp + List<String> disableIndexes = new ArrayList<String>(); + List<PTable> disabledPIndexes = new ArrayList<PTable>(); + for (PTable index : pdataTable.getIndexes()) { + if (index.getIndexState().equals(PIndexState.BUILDING)) { + disableIndexes.add(index.getTableName().getString()); + disabledPIndexes.add(index); + if (minDisableTimestamp > index.getIndexDisableTimestamp()) { + minDisableTimestamp = index.getIndexDisableTimestamp(); + indexWithMinDisableTimestamp = index; + } + } + } + + if (indexWithMinDisableTimestamp == null) { + throw new Exception("There is no index for a datatable to be rebuild:" + qDataTable); + } + if (minDisableTimestamp == 0) { + throw new Exception("It seems Index " + indexWithMinDisableTimestamp + + " has disable timestamp as 0 , please run IndexTool with IndexName to build it first"); + // TODO probably we can initiate the job by ourself or can skip them while making the list for partial build with a warning + } + + long maxTimestamp = getMaxRebuildAsyncDate(schemaName, disableIndexes); + + //serialize index maintaienr in job conf with Base64 TODO: Need to find better way to serialize them in conf. + List<IndexMaintainer> maintainers = Lists.newArrayListWithExpectedSize(disabledPIndexes.size()); + for (PTable index : disabledPIndexes) { + maintainers.add(index.getIndexMaintainer(pdataTable, connection.unwrap(PhoenixConnection.class))); + } + ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY); + IndexMaintainer.serializeAdditional(pdataTable, indexMetaDataPtr, disabledPIndexes, connection.unwrap(PhoenixConnection.class)); + PhoenixConfigurationUtil.setIndexMaintainers(configuration, indexMetaDataPtr); + + //Prepare raw scan + Scan scan = IndexManagementUtil.newLocalStateScan(maintainers); + scan.setTimeRange(minDisableTimestamp - 1, maxTimestamp); + scan.setRaw(true); + scan.setCacheBlocks(false); + if (pdataTable.isTransactional()) { + long maxTimeRange = pdataTable.getTimeStamp() + 1; + scan.setAttribute(BaseScannerRegionObserver.TX_SCN, + Bytes.toBytes(Long.valueOf(Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))))); + } + + + String physicalTableName=pdataTable.getPhysicalName().getString(); + final String jobName = String.format("Phoenix Indexes build for " + pdataTable.getName().toString()); + + PhoenixConfigurationUtil.setInputTableName(configuration, qDataTable); + PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalTableName); + + //TODO: update disable indexes + PhoenixConfigurationUtil.setDisableIndexes(configuration, StringUtils.join(",",disableIndexes)); + + final Job job = Job.getInstance(configuration, jobName); + if (outputPath != null) { + FileOutputFormat.setOutputPath(job, outputPath); + } + job.setJarByClass(IndexTool.class); + TableMapReduceUtil.initTableMapperJob(physicalTableName, scan, PhoenixIndexPartialBuildMapper.class, null, + null, job); + TableMapReduceUtil.initCredentials(job); + TableInputFormat.configureSplitTable(job, TableName.valueOf(physicalTableName)); + return configureSubmittableJobUsingDirectApi(job, true); + } + + private long getMaxRebuildAsyncDate(String schemaName, List<String> disableIndexes) throws SQLException { + Long maxRebuilAsyncDate=HConstants.LATEST_TIMESTAMP; + Long maxDisabledTimeStamp=0L; + if (disableIndexes == null || disableIndexes.isEmpty()) { return 0; } + List<String> quotedIndexes = new ArrayList<String>(disableIndexes.size()); + for (String index : disableIndexes) { + quotedIndexes.add("'" + index + "'"); + } + ResultSet rs = connection.createStatement() + .executeQuery("SELECT MAX(" + ASYNC_REBUILD_TIMESTAMP + "),MAX("+INDEX_DISABLE_TIMESTAMP+") FROM " + SYSTEM_CATALOG_NAME + " (" + + ASYNC_REBUILD_TIMESTAMP + " BIGINT) WHERE " + TABLE_SCHEM + + (schemaName != null && schemaName.length() > 0 ? "='" + schemaName + "'" : " IS NULL") + + " and " + TABLE_NAME + " IN (" + StringUtils.join(",", quotedIndexes) + ")"); + if (rs.next()) { + maxRebuilAsyncDate = rs.getLong(1); + maxDisabledTimeStamp = rs.getLong(2); + } + // Do check if table is disabled again after user invoked async rebuilding during the run of the job + if (maxRebuilAsyncDate > maxDisabledTimeStamp) { + return maxRebuilAsyncDate; + } else { + throw new RuntimeException( + "Inconsistent state we have one or more index tables which are disabled after the async is called!!"); } + + } + private Job configureJobForAysncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi) + throws Exception { + final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); + final String qIndexTable; + if (schemaName != null && !schemaName.isEmpty()) { + qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); + } else { + qIndexTable = indexTable; + } final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); + final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable); - + + long maxTimeRange = pindexTable.getTimeStamp() + 1; // this is set to ensure index tables remains consistent post population. - long maxTimeRange = pindexTable.getTimeStamp()+1; + if (pdataTable.isTransactional()) { configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE, Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))); } configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(maxTimeRange)); - + // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is // computed from the qDataTable name. String physicalIndexTable = pindexTable.getPhysicalName().getString(); - boolean isLocalIndexBuild = false; - if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { - physicalIndexTable = qDataTable; - isLocalIndexBuild = true; - } + final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class); final PostIndexDDLCompiler ddlCompiler = @@ -224,18 +351,15 @@ public class IndexTool extends Configured implements Tool { configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery); PhoenixConfigurationUtil.setPhysicalTableName(configuration, physicalIndexTable); - PhoenixConfigurationUtil.setOutputTableName(configuration, indexTable); + PhoenixConfigurationUtil.setDisableIndexes(configuration, indexTable); PhoenixConfigurationUtil.setUpsertColumnNames(configuration, indexColumns.toArray(new String[indexColumns.size()])); final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns); ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); - - final Path outputPath = CsvBulkImportUtil - .getOutputPath(new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt())), physicalIndexTable); FileSystem.get(configuration).delete(outputPath, true); - final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, dataTable, indexTable); + final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, pdataTable.getName().toString(), indexTable); final Job job = Job.getInstance(configuration, jobName); job.setJarByClass(IndexTool.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); @@ -245,17 +369,146 @@ public class IndexTool extends Configured implements Tool { selectQuery); TableMapReduceUtil.initCredentials(job); - boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); + if (useDirectApi) { - configureSubmittableJobUsingDirectApi(job, outputPath, - cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt())); + return configureSubmittableJobUsingDirectApi(job, false); } else { - configureRunnableJobUsingBulkLoad(job, outputPath, isLocalIndexBuild); - // Without direct API, we need to update the index state to ACTIVE from client. - IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, - PIndexState.ACTIVE); + return configureRunnableJobUsingBulkLoad(job, outputPath); + + } + + } + + /** + * Submits the job and waits for completion. + * @param job + * @param outputPath + * @return + * @throws Exception + */ + private Job configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception { + job.setMapperClass(PhoenixIndexImportMapper.class); + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(KeyValue.class); + final Configuration configuration = job.getConfiguration(); + final String physicalIndexTable = + PhoenixConfigurationUtil.getPhysicalTableName(configuration); + final HTable htable = new HTable(configuration, physicalIndexTable); + HFileOutputFormat.configureIncrementalLoad(job, htable); + return job; + + } + + /** + * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or + * waits for the job completion based on runForeground parameter. + * + * @param job + * @param outputPath + * @param runForeground - if true, waits for job completion, else submits and returns + * immediately. + * @return + * @throws Exception + */ + private Job configureSubmittableJobUsingDirectApi(Job job, boolean isPartialRebuild) + throws Exception { + if (!isPartialRebuild) { + //Don't configure mapper for partial build as it is configured already + job.setMapperClass(PhoenixIndexImportDirectMapper.class); + } + job.setReducerClass(PhoenixIndexImportDirectReducer.class); + Configuration conf = job.getConfiguration(); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + // Set the Physical Table name for use in DirectHTableWriter#write(Mutation) + conf.set(TableOutputFormat.OUTPUT_TABLE, + PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration())); + //Set the Output classes + job.setMapOutputKeyClass(ImmutableBytesWritable.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(NullWritable.class); + TableMapReduceUtil.addDependencyJars(job); + job.setNumReduceTasks(1); + return job; + } + + } + + @Override + public int run(String[] args) throws Exception { + Connection connection = null; + HTable htable = null; + try { + CommandLine cmdLine = null; + try { + cmdLine = parseOptions(args); + } catch (IllegalStateException e) { + printHelpAndExit(e.getMessage(), getOptions()); + } + final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); + final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); + final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); + final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); + final boolean isPartialBuild = cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()); + final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); + boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); + String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); + boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + connection = ConnectionUtil.getInputConnection(configuration); + byte[][] splitKeysBeforeJob = null; + boolean isLocalIndexBuild = false; + PTable pindexTable = null; + if (indexTable != null) { + if (!isValidIndexTable(connection, qDataTable,indexTable)) { + throw new IllegalArgumentException(String.format( + " %s is not an index table for %s ", indexTable, qDataTable)); + } + pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty() + ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable); + htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(pindexTable.getPhysicalName().getBytes()); + if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { + isLocalIndexBuild = true; + splitKeysBeforeJob = htable.getRegionLocator().getStartKeys(); + } + } + + PTable pdataTable = PhoenixRuntime.getTableNoCache(connection, qDataTable); + Path outputPath = null; + if (basePath != null) { + outputPath = CsvBulkImportUtil.getOutputPath(new Path(basePath), pindexTable == null + ? pdataTable.getPhysicalName().getString() : pindexTable.getPhysicalName().getString()); + FileSystem.get(configuration).delete(outputPath, true); + } + + Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable, + useDirectApi, isPartialBuild); + if (!isForeground && useDirectApi) { + LOG.info("Running Index Build in Background - Submit async and exit"); + job.submit(); + return 0; + } + LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!."); + boolean result = job.waitForCompletion(true); + + if (result) { + if (!useDirectApi && indexTable != null) { + if (isLocalIndexBuild) { + validateSplitForLocalIndex(splitKeysBeforeJob, htable); + } + LOG.info("Loading HFiles from {}", outputPath); + LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration); + loader.doBulkLoad(outputPath, htable); + htable.close(); + // Without direct API, we need to update the index state to ACTIVE from client. + IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE); + FileSystem.get(configuration).delete(outputPath, true); + } + return 0; + } else { + LOG.error("IndexTool job failed! Check logs for errors.."); + return -1; } - return 0; } catch (Exception ex) { LOG.error("An exception occurred while performing the indexing job: " + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex)); @@ -265,6 +518,9 @@ public class IndexTool extends Configured implements Tool { if (connection != null) { connection.close(); } + if (htable != null) { + htable.close(); + } } catch (SQLException sqle) { LOG.error("Failed to close connection ", sqle.getMessage()); throw new RuntimeException("Failed to close connection"); @@ -272,91 +528,18 @@ public class IndexTool extends Configured implements Tool { } } - /** - * Submits the job and waits for completion. - * @param job - * @param outputPath - * @return - * @throws Exception - */ - private void configureRunnableJobUsingBulkLoad(Job job, Path outputPath, boolean isLocalIndexBuild) throws Exception { - job.setMapperClass(PhoenixIndexImportMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - final Configuration configuration = job.getConfiguration(); - final String physicalIndexTable = - PhoenixConfigurationUtil.getPhysicalTableName(configuration); - final HTable htable = new HTable(configuration, physicalIndexTable); - HFileOutputFormat.configureIncrementalLoad(job, htable); - byte[][] splitKeysBeforeJob = null; - if(isLocalIndexBuild) { - splitKeysBeforeJob = htable.getRegionLocator().getStartKeys(); - } - boolean status = job.waitForCompletion(true); - if (!status) { - LOG.error("IndexTool job failed!"); - htable.close(); - throw new Exception("IndexTool job failed: " + job.toString()); - } else { - if (isLocalIndexBuild - && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getRegionLocator() - .getStartKeys())) { - String errMsg = "The index to build is local index and the split keys are not matching" - + " before and after running the job. Please rerun the job otherwise" - + " there may be inconsistencies between actual data and index data"; - LOG.error(errMsg); - throw new Exception(errMsg); - } - } - - LOG.info("Loading HFiles from {}", outputPath); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration); - loader.doBulkLoad(outputPath, htable); - htable.close(); - - FileSystem.get(configuration).delete(outputPath, true); - } - /** - * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or - * waits for the job completion based on runForeground parameter. - * - * @param job - * @param outputPath - * @param runForeground - if true, waits for job completion, else submits and returns - * immediately. - * @return - * @throws Exception - */ - private void configureSubmittableJobUsingDirectApi(Job job, Path outputPath, boolean runForeground) - throws Exception { - job.setMapperClass(PhoenixIndexImportDirectMapper.class); - job.setReducerClass(PhoenixIndexImportDirectReducer.class); - Configuration conf = job.getConfiguration(); - HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); - // Set the Physical Table name for use in DirectHTableWriter#write(Mutation) - conf.set(TableOutputFormat.OUTPUT_TABLE, - PhoenixConfigurationUtil.getPhysicalTableName(job.getConfiguration())); - - //Set the Output classes - job.setMapOutputValueClass(IntWritable.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(NullWritable.class); - TableMapReduceUtil.addDependencyJars(job); - job.setNumReduceTasks(1); - - if (!runForeground) { - LOG.info("Running Index Build in Background - Submit async and exit"); - job.submit(); - return; - } - LOG.info("Running Index Build in Foreground. Waits for the build to complete. This may take a long time!."); - boolean result = job.waitForCompletion(true); - if (!result) { - LOG.error("IndexTool job failed!"); - throw new Exception("IndexTool job failed: " + job.toString()); + + private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, HTable htable) throws Exception { + if (splitKeysBeforeJob != null + && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getRegionLocator().getStartKeys())) { + String errMsg = "The index to build is local index and the split keys are not matching" + + " before and after running the job. Please rerun the job otherwise" + + " there may be inconsistencies between actual data and index data"; + LOG.error(errMsg); + throw new Exception(errMsg); } - FileSystem.get(conf).delete(outputPath, true); + return true; } /**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java index 1058670..2dc7551 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexToolUtil.java @@ -48,11 +48,13 @@ public class IndexToolUtil { */ public static void updateIndexState(Configuration configuration,PIndexState state) throws SQLException { final String masterTable = PhoenixConfigurationUtil.getInputTableName(configuration); - final String indexTable = PhoenixConfigurationUtil.getOutputTableName(configuration); + final String[] indexTables = PhoenixConfigurationUtil.getDisableIndexes(configuration).split(","); final Properties overrideProps = new Properties(); final Connection connection = ConnectionUtil.getOutputConnection(configuration, overrideProps); try { - updateIndexState(connection, masterTable, indexTable , state); + for (String indexTable : indexTables) { + updateIndexState(connection, masterTable, indexTable, state); + } } finally { if(connection != null) { connection.close(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java index 9c64efc..15e55dd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java @@ -99,7 +99,7 @@ public class PhoenixIndexImportDirectMapper extends this.pStatement = connection.prepareStatement(upsertQuery); } catch (SQLException e) { - throw new RuntimeException(e.getMessage()); + throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java new file mode 100644 index 0000000..47a38a7 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.TableMapper; +import org.apache.hadoop.io.IntWritable; +import org.apache.phoenix.cache.ServerCacheClient; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.PhoenixJobCounters; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * Mapper that hands over rows from data table to the index table. + */ +public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWritable, IntWritable> { + + private static final Logger LOG = LoggerFactory.getLogger(PhoenixIndexPartialBuildMapper.class); + + private PhoenixConnection connection; + + private DirectHTableWriter writer; + + private int batchSize; + + private List<Mutation> mutations ; + + private ImmutableBytesPtr maintainers; + + @Override + protected void setup(final Context context) throws IOException, InterruptedException { + super.setup(context); + final Configuration configuration = context.getConfiguration(); + writer = new DirectHTableWriter(configuration); + + try { + final Properties overrideProps = new Properties(); + String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE); + String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE); + if(txScnValue==null && scn!=null) { + overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn); + } + connection = ConnectionUtil.getOutputConnection(configuration, overrideProps).unwrap(PhoenixConnection.class); + connection.setAutoCommit(false); + // Get BatchSize + ConnectionQueryServices services = connection.getQueryServices(); + int maxSize = + services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB, + QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); + batchSize = Math.min(connection.getMutateBatchSize(), maxSize); + LOG.info("Mutation Batch Size = " + batchSize); + this.mutations = Lists.newArrayListWithExpectedSize(batchSize); + maintainers=new ImmutableBytesPtr(PhoenixConfigurationUtil.getIndexMaintainers(configuration)); + } catch (SQLException e) { + throw new RuntimeException(e.getMessage()); + } + } + + @Override + protected void map(ImmutableBytesWritable row, Result value, Context context) + throws IOException, InterruptedException { + context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1); + try { + byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(maintainers); + byte[] uuidValue = ServerCacheClient.generateId(); + Put put = null; + Delete del = null; + for (Cell cell : value.rawCells()) { + if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put) { + if (put == null) { + put = new Put(CellUtil.cloneRow(cell)); + put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + put.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + put.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + mutations.add(put); + } + put.add(cell); + } else { + if (del == null) { + del = new Delete(CellUtil.cloneRow(cell)); + del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + del.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + del.setAttribute(BaseScannerRegionObserver.IGNORE_NEWER_MUTATIONS, PDataType.TRUE_BYTES); + mutations.add(del); + } + del.addDeleteMarker(cell); + } + } + // Write Mutation Batch + if (context.getCounter(PhoenixJobCounters.INPUT_RECORDS).getValue() % batchSize == 0) { + writeBatch(mutations, context); + mutations.clear(); + } + // Make sure progress is reported to Application Master. + context.progress(); + } catch (SQLException e) { + LOG.error(" Error {} while read/write of a record ", e.getMessage()); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + throw new RuntimeException(e); + } + } + + private void writeBatch(List<Mutation> mutations, Context context) + throws IOException, SQLException, InterruptedException { + writer.write(mutations); + context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(mutations.size()); + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + try { + // Write the last & final Mutation Batch + if (!mutations.isEmpty()) { + writeBatch(mutations, context); + } + // We are writing some dummy key-value as map output here so that we commit only one + // output to reducer. + context.write(new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), + new IntWritable(0)); + super.cleanup(context); + } catch (SQLException e) { + LOG.error(" Error {} while read/write of a record ", e.getMessage()); + context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1); + throw new RuntimeException(e); + } finally { + if (connection != null) { + try { + connection.close(); + } catch (SQLException e) { + LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ", + e.getMessage()); + } + } + if (writer != null) { + writer.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 2264acd..f3e4450 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -30,7 +30,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable; import org.apache.hadoop.mapreduce.lib.db.DBWritable; @@ -98,6 +100,12 @@ public final class PhoenixConfigurationUtil { public static final String MAPREDUCE_OUTPUT_CLUSTER_QUORUM = "phoneix.mapreduce.output.cluster.quorum"; + public static final String INDEX_DISABLED_TIMESTAMP_VALUE = "phoenix.mr.index.disableTimestamp"; + + public static final String INDEX_MAINTAINERS = "phoenix.mr.index.maintainers"; + + public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes"; + public enum SchemaType { TABLE, QUERY; @@ -428,4 +436,27 @@ public final class PhoenixConfigurationUtil { return ReflectionUtils.newInstance(processorClass, conf); } + + public static byte[] getIndexMaintainers(final Configuration configuration){ + Preconditions.checkNotNull(configuration); + return Base64.decode(configuration.get(INDEX_MAINTAINERS)); + } + + public static void setIndexMaintainers(final Configuration configuration, + final ImmutableBytesWritable indexMetaDataPtr) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(indexMetaDataPtr); + configuration.set(INDEX_MAINTAINERS, Base64.encodeBytes(indexMetaDataPtr.get())); + } + + public static void setDisableIndexes(Configuration configuration, String indexName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(indexName); + configuration.set(DISABLED_INDEXES, indexName); + } + + public static String getDisableIndexes(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(DISABLED_INDEXES); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java index fcf817a..11328c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/AlterIndexStatement.java @@ -23,12 +23,14 @@ public class AlterIndexStatement extends SingleTableStatement { private final String dataTableName; private final boolean ifExists; private final PIndexState indexState; + private boolean async; - public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState) { + public AlterIndexStatement(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState indexState, boolean async) { super(indexTableNode,0); this.dataTableName = dataTableName; this.ifExists = ifExists; this.indexState = indexState; + this.async = async; } public String getTableName() { @@ -48,4 +50,8 @@ public class AlterIndexStatement extends SingleTableStatement { return indexState; } + public boolean isAsync() { + return async; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java index 0c60e39..4b65c6a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java @@ -374,8 +374,12 @@ public class ParseNodeFactory { return new DropIndexStatement(indexName, tableName, ifExists); } + public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state, boolean async) { + return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, async); + } + public AlterIndexStatement alterIndex(NamedTableNode indexTableNode, String dataTableName, boolean ifExists, PIndexState state) { - return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state); + return new AlterIndexStatement(indexTableNode, dataTableName, ifExists, state, false); } public TraceStatement trace(boolean isTraceOn, double samplingRate) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 4a565be..1e002d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -127,10 +127,13 @@ public interface QueryServices extends SQLCloseable { // A master switch if to enable auto rebuild an index which failed to be updated previously public static final String INDEX_FAILURE_HANDLING_REBUILD_ATTRIB = "phoenix.index.failure.handling.rebuild"; + public static final String INDEX_FAILURE_HANDLING_REBUILD_PERIOD = "phoenix.index.failure.handling.rebuild.period"; // Time interval to check if there is an index needs to be rebuild public static final String INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB = "phoenix.index.failure.handling.rebuild.interval"; + + public static final String INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE = "phoenix.index.rebuild.batch.perTable"; // A master switch if to block writes when index build failed public static final String INDEX_FAILURE_BLOCK_WRITE = "phoenix.index.failure.block.write"; @@ -227,6 +230,7 @@ public interface QueryServices extends SQLCloseable { public static final String CLIENT_CACHE_ENCODING = "phoenix.table.client.cache.encoding"; public static final String AUTO_UPGRADE_ENABLED = "phoenix.autoupgrade.enabled"; + /** * Get executor service used for parallel scans */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 7f183e9..a3b9b32 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -27,6 +27,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.APPEND_ONLY_SCHEMA import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARG_POSITION; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ARRAY_SIZE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_CREATED_DATE; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.AUTO_PARTITION_SEQ; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BASE_COLUMN_COUNT; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME; @@ -304,16 +305,27 @@ public class MetaDataClient { TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + - INDEX_STATE + + INDEX_STATE + "," + + ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + + ") VALUES (?, ?, ?, ?, ?)"; + + private static final String UPDATE_INDEX_REBUILD_ASYNC_STATE = + "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + + TENANT_ID + "," + + TABLE_SCHEM + "," + + TABLE_NAME + "," + + ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + ") VALUES (?, ?, ?, ?)"; + private static final String UPDATE_INDEX_STATE_TO_ACTIVE = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + TENANT_ID + "," + TABLE_SCHEM + "," + TABLE_NAME + "," + INDEX_STATE + "," + - INDEX_DISABLE_TIMESTAMP + - ") VALUES (?, ?, ?, ?, ?)"; + INDEX_DISABLE_TIMESTAMP +","+ + ASYNC_REBUILD_TIMESTAMP + " " + PLong.INSTANCE.getSqlTypeName() + + ") VALUES (?, ?, ?, ?, ?, ?)"; //TODO: merge INSERT_COLUMN_CREATE_TABLE and INSERT_COLUMN_ALTER_TABLE column when // the new major release is out. private static final String INSERT_COLUMN_CREATE_TABLE = @@ -3469,7 +3481,13 @@ public class MetaDataClient { String dataTableName = statement.getTableName(); String schemaName = statement.getTable().getName().getSchemaName(); String indexName = statement.getTable().getName().getTableName(); + boolean isAsync = statement.isAsync(); PIndexState newIndexState = statement.getIndexState(); + if (isAsync && newIndexState != PIndexState.REBUILD) { throw new SQLExceptionInfo.Builder( + SQLExceptionCode.ASYNC_NOT_ALLOWED) + .setMessage(" ASYNC building of index is allowed only with REBUILD index state") + .setSchemaName(schemaName).setTableName(indexName).build().buildException(); } + if (newIndexState == PIndexState.REBUILD) { newIndexState = PIndexState.BUILDING; } @@ -3480,15 +3498,16 @@ public class MetaDataClient { try { if(newIndexState == PIndexState.ACTIVE){ tableUpsert = connection.prepareStatement(UPDATE_INDEX_STATE_TO_ACTIVE); - } else { + }else{ tableUpsert = connection.prepareStatement(UPDATE_INDEX_STATE); } tableUpsert.setString(1, connection.getTenantId() == null ? null : connection.getTenantId().getString()); tableUpsert.setString(2, schemaName); tableUpsert.setString(3, indexName); tableUpsert.setString(4, newIndexState.getSerializedValue()); + tableUpsert.setLong(5, 0); if(newIndexState == PIndexState.ACTIVE){ - tableUpsert.setLong(5, 0); + tableUpsert.setLong(6, 0); } tableUpsert.execute(); } finally { @@ -3515,9 +3534,25 @@ public class MetaDataClient { addTableToCache(result); // Set so that we get the table below with the potentially modified rowKeyOrderOptimizable flag set indexRef.setTable(result.getTable()); + if (newIndexState == PIndexState.BUILDING && isAsync) { + try { + tableUpsert = connection.prepareStatement(UPDATE_INDEX_REBUILD_ASYNC_STATE); + tableUpsert.setString(1, + connection.getTenantId() == null ? null : connection.getTenantId().getString()); + tableUpsert.setString(2, schemaName); + tableUpsert.setString(3, indexName); + tableUpsert.setLong(4, result.getTable().getTimeStamp()); + tableUpsert.execute(); + connection.commit(); + } finally { + if (tableUpsert != null) { + tableUpsert.close(); + } + } + } } } - if (newIndexState == PIndexState.BUILDING) { + if (newIndexState == PIndexState.BUILDING && !isAsync) { PTable index = indexRef.getTable(); // First delete any existing rows of the index Long scn = connection.getSCN(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/83827cd8/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 9622880..2a0f3b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -42,9 +42,13 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.WritableUtils; @@ -54,6 +58,10 @@ import org.apache.phoenix.compile.IndexStatementRewriter; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.compile.WhereCompiler; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; +import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.TupleProjector; @@ -67,10 +75,12 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; +import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -78,6 +88,7 @@ import org.apache.phoenix.schema.ColumnRef; import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.TableNotFoundException; @@ -87,12 +98,13 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PBinary; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDecimal; +import org.apache.phoenix.schema.types.PLong; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import org.apache.tephra.TxConstants; import com.google.common.collect.Lists; - -import org.apache.tephra.TxConstants; +import com.google.protobuf.ServiceException; public class IndexUtil { public static final String INDEX_COLUMN_NAME_SEP = ":"; @@ -673,13 +685,48 @@ public class IndexUtil { HConstants.NO_NONCE, HConstants.NO_NONCE); } + public static MetaDataMutationResult disableIndexWithTimestamp(String indexTableName, long minTimeStamp, + HTableInterface metaTable, boolean blockWriteRebuildIndex) throws ServiceException, Throwable { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + // Mimic the Put that gets generated by the client on an update of the index state + Put put = new Put(indexTableKey); + if (blockWriteRebuildIndex) + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + PIndexState.ACTIVE.getSerializedBytes()); + else + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + PIndexState.DISABLE.getSerializedBytes()); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, + PLong.INSTANCE.toBytes(minTimeStamp)); + put.add(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, + PLong.INSTANCE.toBytes(0)); + final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put); + + final Map<byte[], MetaDataResponse> results = metaTable.coprocessorService(MetaDataService.class, indexTableKey, + indexTableKey, new Batch.Call<MetaDataService, MetaDataResponse>() { + @Override + public MetaDataResponse call(MetaDataService instance) throws IOException { + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<MetaDataResponse> rpcCallback = new BlockingRpcCallback<MetaDataResponse>(); + UpdateIndexStateRequest.Builder builder = UpdateIndexStateRequest.newBuilder(); + for (Mutation m : tableMetadata) { + MutationProto mp = ProtobufUtil.toProto(m); + builder.addTableMetadataMutations(mp.toByteString()); + } + instance.updateIndexState(controller, builder.build(), rpcCallback); + if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } + return rpcCallback.get(); + } + }); + if (results.isEmpty()) { throw new IOException("Didn't get expected result size"); } + MetaDataResponse tmpResponse = results.values().iterator().next(); + return MetaDataMutationResult.constructFromProto(tmpResponse); + } + public static boolean matchingSplitKeys(byte[][] splitKeys1, byte[][] splitKeys2) throws IOException { - if (splitKeys1 != null && splitKeys2 != null - && splitKeys1.length == splitKeys2.length) { + if (splitKeys1 != null && splitKeys2 != null && splitKeys1.length == splitKeys2.length) { for (int i = 0; i < splitKeys1.length; i++) { - if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) { - return false; - } + if (Bytes.compareTo(splitKeys1[i], splitKeys2[i]) != 0) { return false; } } } else { return false;