Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 cce7ab29a -> 124404ae2
http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java new file mode 100644 index 0000000..f3ff39e --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java @@ -0,0 +1,522 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.CsvBulkImportUtil; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.parse.HintNode.Hint; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * An MR job to verify that the index table is in sync with the data table. + * + */ +public class IndexScrutinyTool extends Configured implements Tool { + + private static final Logger LOG = LoggerFactory.getLogger(IndexScrutinyTool.class); + + private static final Option SCHEMA_NAME_OPTION = + new Option("s", "schema", true, "Phoenix schema name (optional)"); + private static final Option DATA_TABLE_OPTION = + new Option("dt", "data-table", true, "Data table name (mandatory)"); + private static final Option INDEX_TABLE_OPTION = + new Option("it", "index-table", true, + "Index table name (mandatory)."); + private static final Option TIMESTAMP = + new Option("t", "time", true, + "Timestamp in millis used to compare the index and data tables. Defaults to current time minus 60 seconds"); + + private static final Option RUN_FOREGROUND_OPTION = + new Option("runfg", "run-foreground", false, "Applicable on top of -direct option." + + "If specified, runs index scrutiny in Foreground. Default - Runs the build in background."); + + private static final Option SNAPSHOT_OPTION = //TODO check if this works + new Option("snap", "snapshot", false, + "If specified, uses Snapshots for async index building (optional)"); + + public static final Option BATCH_SIZE_OPTION = + new Option("b", "batch-size", true, "Number of rows to compare at a time"); + public static final Option SOURCE_TABLE_OPTION = + new Option("src", "source", true, + "Table to use as the source table, whose rows are iterated over and compared to the other table." + + " Options are DATA_TABLE_SOURCE, INDEX_TABLE_SOURCE, BOTH." + + " Defaults to BOTH, which does two separate jobs to iterate over both tables"); + + private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); + + private static final Option OUTPUT_INVALID_ROWS_OPTION = + new Option("o", "output", false, "Whether to output invalid rows"); + private static final Option OUTPUT_FORMAT_OPTION = + new Option("of", "output-format", true, + "Format in which to output invalid rows. Options are FILE, TABLE. Defaults to TABLE"); + private static final Option OUTPUT_PATH_OPTION = + new Option("op", "output-path", true, "Output path where the files are written"); + private static final Option OUTPUT_MAX = new Option("om", "output-max", true, "Max number of invalid rows to output per mapper. Defaults to 1M"); + public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_SCRUTINY_[%s]_[%s]"; + + /** + * Which table to use as the source table + */ + public static enum SourceTable { + DATA_TABLE_SOURCE, INDEX_TABLE_SOURCE, + /** + * Runs two separate jobs to iterate over both tables + */ + BOTH; + } + + public static enum OutputFormat { + FILE, TABLE + } + + private List<Job> jobs = Lists.newArrayList(); + + private Options getOptions() { + final Options options = new Options(); + options.addOption(SCHEMA_NAME_OPTION); + options.addOption(DATA_TABLE_OPTION); + options.addOption(INDEX_TABLE_OPTION); + options.addOption(RUN_FOREGROUND_OPTION); + options.addOption(OUTPUT_INVALID_ROWS_OPTION); + options.addOption(OUTPUT_FORMAT_OPTION); + options.addOption(OUTPUT_PATH_OPTION); + options.addOption(OUTPUT_MAX); + options.addOption(SNAPSHOT_OPTION); + options.addOption(HELP_OPTION); + options.addOption(TIMESTAMP); + options.addOption(BATCH_SIZE_OPTION); + options.addOption(SOURCE_TABLE_OPTION); + return options; + } + + /** + * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are + * missing. + * @param args supplied command line arguments + * @return the parsed command line + */ + private CommandLine parseOptions(String[] args) { + final Options options = getOptions(); + + CommandLineParser parser = new PosixParser(); + CommandLine cmdLine = null; + try { + cmdLine = parser.parse(options, args); + } catch (ParseException e) { + printHelpAndExit("Error parsing command line options: " + e.getMessage(), options); + } + + if (cmdLine.hasOption(HELP_OPTION.getOpt())) { + printHelpAndExit(options, 0); + } + + requireOption(cmdLine, DATA_TABLE_OPTION); + requireOption(cmdLine, INDEX_TABLE_OPTION); + + return cmdLine; + } + + private void requireOption(CommandLine cmdLine, Option option) { + if (!cmdLine.hasOption(option.getOpt())) { + throw new IllegalStateException(option.getLongOpt() + " is a mandatory parameter"); + } + } + + private void printHelpAndExit(String errorMessage, Options options) { + System.err.println(errorMessage); + printHelpAndExit(options, 1); + } + + private void printHelpAndExit(Options options, int exitCode) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("help", options); + System.exit(exitCode); + } + + class JobFactory { + Connection connection; + Configuration configuration; + private boolean useSnapshot; + private long ts; + private boolean outputInvalidRows; + private OutputFormat outputFormat; + private String basePath; + private long scrutinyExecuteTime; + private long outputMaxRows; // per mapper + + public JobFactory(Connection connection, Configuration configuration, long batchSize, + boolean useSnapshot, long ts, boolean outputInvalidRows, OutputFormat outputFormat, + String basePath, long outputMaxRows) { + this.outputInvalidRows = outputInvalidRows; + this.outputFormat = outputFormat; + this.basePath = basePath; + this.outputMaxRows = outputMaxRows; + PhoenixConfigurationUtil.setScrutinyBatchSize(configuration, batchSize); + this.connection = connection; + this.configuration = configuration; + this.useSnapshot = useSnapshot; + this.ts = ts; // CURRENT_SCN to set + scrutinyExecuteTime = EnvironmentEdgeManager.currentTimeMillis(); // time at which scrutiny was run. + // Same for + // all jobs created from this factory + PhoenixConfigurationUtil.setScrutinyExecuteTimestamp(configuration, + scrutinyExecuteTime); + } + + public Job createSubmittableJob(String schemaName, String indexTable, String dataTable, + SourceTable sourceTable) throws Exception { + Preconditions.checkArgument(SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) + || SourceTable.INDEX_TABLE_SOURCE.equals(sourceTable)); + + final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); + final String qIndexTable; + if (schemaName != null && !schemaName.isEmpty()) { + qIndexTable = SchemaUtil.getQualifiedTableName(schemaName, indexTable); + } else { + qIndexTable = indexTable; + } + PhoenixConfigurationUtil.setScrutinyDataTable(configuration, qDataTable); + PhoenixConfigurationUtil.setScrutinyIndexTable(configuration, qIndexTable); + PhoenixConfigurationUtil.setScrutinySourceTable(configuration, sourceTable); + PhoenixConfigurationUtil.setScrutinyOutputInvalidRows(configuration, outputInvalidRows); + PhoenixConfigurationUtil.setScrutinyOutputMax(configuration, outputMaxRows); + + final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); + final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable); + + // set CURRENT_SCN for our scan so that incoming writes don't throw off scrutiny + configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(ts)); + + // set the source table to either data or index table + SourceTargetColumnNames columnNames = + SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) + ? new SourceTargetColumnNames.DataSourceColNames(pdataTable, + pindexTable) + : new SourceTargetColumnNames.IndexSourceColNames(pdataTable, + pindexTable); + String qSourceTable = columnNames.getQualifiedSourceTableName(); + List<String> sourceColumnNames = columnNames.getSourceColNames(); + List<String> sourceDynamicCols = columnNames.getSourceDynamicCols(); + List<String> targetDynamicCols = columnNames.getTargetDynamicCols(); + + // Setup the select query against source - we either select the index columns from the + // index table, + // or select the data table equivalents of the index columns from the data table + final String selectQuery = + QueryUtil.constructSelectStatement(qSourceTable, sourceColumnNames, null, + Hint.NO_INDEX, true); + LOG.info("Query used on source table to feed the mapper: " + selectQuery); + + PhoenixConfigurationUtil.setScrutinyOutputFormat(configuration, outputFormat); + // if outputting to table, setup the upsert to the output table + if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { + String upsertStmt = + IndexScrutinyTableOutput.constructOutputTableUpsert(sourceDynamicCols, + targetDynamicCols, connection); + PhoenixConfigurationUtil.setUpsertStatement(configuration, upsertStmt); + LOG.info("Upsert statement used for output table: " + upsertStmt); + } + + final String jobName = + String.format(INDEX_JOB_NAME_TEMPLATE, qSourceTable, + columnNames.getQualifiedTargetTableName()); + final Job job = Job.getInstance(configuration, jobName); + + if (!useSnapshot) { + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, + selectQuery); + } else { // TODO check if using a snapshot works + HBaseAdmin admin = null; + String snapshotName; + try { + final PhoenixConnection pConnection = + connection.unwrap(PhoenixConnection.class); + admin = pConnection.getQueryServices().getAdmin(); + String pdataTableName = pdataTable.getName().getString(); + snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString(); + admin.snapshot(snapshotName, TableName.valueOf(pdataTableName)); + } finally { + if (admin != null) { + admin.close(); + } + } + // root dir not a subdirectory of hbase dir + Path rootDir = new Path("hdfs:///index-snapshot-dir"); + FSUtils.setRootDir(configuration, rootDir); + Path restoreDir = new Path(FSUtils.getRootDir(configuration), "restore-dir"); + + // set input for map reduce job using hbase snapshots + //PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, snapshotName, + // qDataTable, restoreDir, selectQuery); + } + TableMapReduceUtil.initCredentials(job); + Path outputPath = + getOutputPath(configuration, basePath, + SourceTable.DATA_TABLE_SOURCE.equals(sourceTable) ? pdataTable + : pindexTable); + + return configureSubmittableJob(job, outputPath); + } + + private Job configureSubmittableJob(Job job, Path outputPath) throws Exception { + Configuration conf = job.getConfiguration(); + conf.setBoolean("mapreduce.job.user.classpath.first", true); + HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); + job.setJarByClass(IndexScrutinyTool.class); + job.setOutputFormatClass(NullOutputFormat.class); + if (outputInvalidRows && OutputFormat.FILE.equals(outputFormat)) { + job.setOutputFormatClass(TextOutputFormat.class); + FileOutputFormat.setOutputPath(job, outputPath); + } + job.setMapperClass(IndexScrutinyMapper.class); + job.setNumReduceTasks(0); + // Set the Output classes + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + TableMapReduceUtil.addDependencyJars(job); + return job; + } + + Path getOutputPath(final Configuration configuration, String basePath, PTable table) + throws IOException { + Path outputPath = null; + FileSystem fs; + if (basePath != null) { + outputPath = + CsvBulkImportUtil.getOutputPath(new Path(basePath), + table.getPhysicalName().getString()); + fs = outputPath.getFileSystem(configuration); + fs.delete(outputPath, true); + } + return outputPath; + } + } + + @Override + public int run(String[] args) throws Exception { + Connection connection = null; + try { + /** start - parse command line configs **/ + CommandLine cmdLine = null; + try { + cmdLine = parseOptions(args); + } catch (IllegalStateException e) { + printHelpAndExit(e.getMessage(), getOptions()); + } + final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf()); + connection = ConnectionUtil.getInputConnection(configuration); + final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt()); + final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt()); + final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt()); + final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); + String basePath = cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); + boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); + boolean outputInvalidRows = cmdLine.hasOption(OUTPUT_INVALID_ROWS_OPTION.getOpt()); + SourceTable sourceTable = + cmdLine.hasOption(SOURCE_TABLE_OPTION.getOpt()) + ? SourceTable + .valueOf(cmdLine.getOptionValue(SOURCE_TABLE_OPTION.getOpt())) + : SourceTable.BOTH; + + long batchSize = + cmdLine.hasOption(BATCH_SIZE_OPTION.getOpt()) + ? Long.parseLong(cmdLine.getOptionValue(BATCH_SIZE_OPTION.getOpt())) + : PhoenixConfigurationUtil.DEFAULT_SCRUTINY_BATCH_SIZE; + + long ts = + cmdLine.hasOption(TIMESTAMP.getOpt()) + ? Long.parseLong(cmdLine.getOptionValue(TIMESTAMP.getOpt())) + : EnvironmentEdgeManager.currentTimeMillis() - 60000; + + if (indexTable != null) { + if (!isValidIndexTable(connection, qDataTable, indexTable)) { + throw new IllegalArgumentException(String + .format(" %s is not an index table for %s ", indexTable, qDataTable)); + } + } + + String outputFormatOption = cmdLine.getOptionValue(OUTPUT_FORMAT_OPTION.getOpt()); + OutputFormat outputFormat = + outputFormatOption != null + ? OutputFormat.valueOf(outputFormatOption.toUpperCase()) + : OutputFormat.TABLE; + long outputMaxRows = + cmdLine.hasOption(OUTPUT_MAX.getOpt()) + ? Long.parseLong(cmdLine.getOptionValue(OUTPUT_MAX.getOpt())) + : 1000000L; + /** end - parse command line configs **/ + + if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { + // create the output table if it doesn't exist + try (Connection outputConn = ConnectionUtil.getOutputConnection(configuration)) { + outputConn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL); + outputConn.createStatement() + .execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL); + } + } + + LOG.info(String.format( + "Running scrutiny [schemaName=%s, dataTable=%s, indexTable=%s, useSnapshot=%s, timestamp=%s, batchSize=%s, outputBasePath=%s, outputFormat=%s, outputMaxRows=%s]", + schemaName, dataTable, indexTable, useSnapshot, ts, batchSize, basePath, + outputFormat, outputMaxRows)); + JobFactory jobFactory = + new JobFactory(connection, configuration, batchSize, useSnapshot, ts, + outputInvalidRows, outputFormat, basePath, outputMaxRows); + // If we are running the scrutiny with both tables as the source, run two separate jobs, + // one for each direction + if (SourceTable.BOTH.equals(sourceTable)) { + jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable, + SourceTable.DATA_TABLE_SOURCE)); + jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable, + SourceTable.INDEX_TABLE_SOURCE)); + } else { + jobs.add(jobFactory.createSubmittableJob(schemaName, indexTable, dataTable, + sourceTable)); + } + + if (!isForeground) { + LOG.info("Running Index Scrutiny in Background - Submit async and exit"); + for (Job job : jobs) { + job.submit(); + } + return 0; + } + LOG.info( + "Running Index Scrutiny in Foreground. Waits for the build to complete. This may take a long time!."); + boolean result = true; + for (Job job : jobs) { + result = result && job.waitForCompletion(true); + } + + // write the results to the output metadata table + if (outputInvalidRows && OutputFormat.TABLE.equals(outputFormat)) { + LOG.info("Writing results of jobs to output table " + + IndexScrutinyTableOutput.OUTPUT_METADATA_TABLE_NAME); + IndexScrutinyTableOutput.writeJobResults(connection, args, jobs); + } + + if (result) { + return 0; + } else { + LOG.error("IndexScrutinyTool job failed! Check logs for errors.."); + return -1; + } + } catch (Exception ex) { + LOG.error("An exception occurred while performing the indexing job: " + + ExceptionUtils.getMessage(ex) + " at:\n" + ExceptionUtils.getStackTrace(ex)); + return -1; + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException sqle) { + LOG.error("Failed to close connection ", sqle.getMessage()); + throw new RuntimeException("Failed to close connection"); + } + } + } + + @VisibleForTesting + public List<Job> getJobs() { + return jobs; + } + + /** + * Checks for the validity of the index table passed to the job. + * @param connection + * @param masterTable + * @param indexTable + * @return + * @throws SQLException + */ + private boolean isValidIndexTable(final Connection connection, final String masterTable, + final String indexTable) throws SQLException { + final DatabaseMetaData dbMetaData = connection.getMetaData(); + final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable); + final String tableName = + SchemaUtil.normalizeIdentifier(SchemaUtil.getTableNameFromFullName(masterTable)); + + ResultSet rs = null; + try { + rs = dbMetaData.getIndexInfo(null, schemaName, tableName, false, false); + while (rs.next()) { + final String indexName = rs.getString(6); + if (indexTable.equalsIgnoreCase(indexName)) { + return true; + } + } + } finally { + if (rs != null) { + rs.close(); + } + } + return false; + } + + public static void main(final String[] args) throws Exception { + int result = ToolRunner.run(new IndexScrutinyTool(), args); + System.exit(result); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java index 2be810a..e426390 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java @@ -23,6 +23,7 @@ import java.sql.SQLException; import java.util.List; import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.util.ColumnInfo; import com.google.common.base.Preconditions; @@ -40,6 +41,8 @@ public class PhoenixIndexDBWritable implements DBWritable { private List<Object> values; private int columnCount = -1; + + private long rowTs = -1; @Override public void write(PreparedStatement statement) throws SQLException { @@ -63,7 +66,9 @@ public class PhoenixIndexDBWritable implements DBWritable { if(columnCount == -1) { this.columnCount = resultSet.getMetaData().getColumnCount(); } - + if (columnCount > 0) { + this.rowTs = resultSet.unwrap(PhoenixResultSet.class).getCurrentRow().getValue(0).getTimestamp(); + } values = Lists.newArrayListWithCapacity(columnCount); for(int i = 0 ; i < columnCount ; i++) { Object value = resultSet.getObject(i + 1); @@ -88,4 +93,8 @@ public class PhoenixIndexDBWritable implements DBWritable { this.values = values; } + public long getRowTs() { + return rowTs; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java new file mode 100644 index 0000000..3cf73fd --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixScrutinyJobCounters.java @@ -0,0 +1,41 @@ +/** +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +/** + * Counters used for Index Scrutiny MR job + */ +public enum PhoenixScrutinyJobCounters { + /** + * number of rows in data table with a valid index row (or vice-versa) + */ + VALID_ROW_COUNT, + /** + * number of rows in data table with an invalid index row (or vice-versa) + */ + INVALID_ROW_COUNT, + /** + * Number of rows in the index table with an incorrect covered column value + */ + BAD_COVERED_COL_VAL_COUNT, + /** + * Number of batches processed + */ + BATCHES_PROCESSED_COUNT; +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/SourceTargetColumnNames.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/SourceTargetColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/SourceTargetColumnNames.java new file mode 100644 index 0000000..1c7991f --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/SourceTargetColumnNames.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.util.List; + +import org.apache.phoenix.mapreduce.util.IndexColumnNames; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.SchemaUtil; + +/** + * Get index scrutiny source/target column names, depending on whether the source is the + * data table or index table + */ +public interface SourceTargetColumnNames { + + List<String> getSourceColNames(); + + List<String> getUnqualifiedSourceColNames(); + + List<String> getTargetColNames(); + + /** + * @return The target column name with a CAST to the source's data type + */ + List<String> getCastedTargetColNames(); + + List<String> getUnqualifiedTargetColNames(); + + List<String> getSourceDynamicCols(); + + List<String> getTargetDynamicCols(); + + List<String> getTargetPkColNames(); + + List<String> getSourcePkColNames(); + + String getQualifiedSourceTableName(); + + String getQualifiedTargetTableName(); + + /** + * Used when the data table is the source table of a scrutiny + */ + public static class DataSourceColNames extends IndexColumnNames + implements SourceTargetColumnNames { + /** + * @param pdataTable the data table + * @param pindexTable the index table for the data table + */ + public DataSourceColNames(PTable pdataTable, PTable pindexTable) { + super(pdataTable, pindexTable); + } + + @Override + public List<String> getSourceColNames() { + return getDataColNames(); + } + + @Override + public List<String> getUnqualifiedSourceColNames() { + return getUnqualifiedDataColNames(); + } + + @Override + public List<String> getUnqualifiedTargetColNames() { + return getUnqualifiedIndexColNames(); + } + + @Override + public List<String> getTargetColNames() { + return getIndexColNames(); + } + + @Override + public List<String> getSourceDynamicCols() { + return getDynamicDataCols(); + } + + @Override + public List<String> getTargetDynamicCols() { + return getDynamicIndexCols(); + } + + @Override + public List<String> getTargetPkColNames() { + return getIndexPkColNames(); + } + + @Override + public List<String> getSourcePkColNames() { + return getDataPkColNames(); + } + + @Override + public String getQualifiedSourceTableName() { + return getQualifiedDataTableName(); + } + + @Override + public String getQualifiedTargetTableName() { + return getQualifiedIndexTableName(); + } + + @Override + public List<String> getCastedTargetColNames() { + return getCastedColumnNames(getIndexColNames(), dataColSqlTypeNames); + } + + } + + /** + * Used when the index table is the source table of a scrutiny + */ + public static class IndexSourceColNames extends IndexColumnNames + implements SourceTargetColumnNames { + /** + * @param pdataTable the data table + * @param pindexTable the index table for the data table + */ + public IndexSourceColNames(PTable pdataTable, PTable pindexTable) { + super(pdataTable, pindexTable); + } + + @Override + public List<String> getSourceColNames() { + return getIndexColNames(); + } + + @Override + public List<String> getUnqualifiedSourceColNames() { + return getUnqualifiedIndexColNames(); + } + + @Override + public List<String> getUnqualifiedTargetColNames() { + return getUnqualifiedDataColNames(); + } + + @Override + public List<String> getTargetColNames() { + return getDataColNames(); + } + + @Override + public List<String> getSourceDynamicCols() { + return getDynamicIndexCols(); + } + + @Override + public List<String> getTargetDynamicCols() { + return getDynamicDataCols(); + } + + @Override + public List<String> getTargetPkColNames() { + return getDataPkColNames(); + } + + @Override + public List<String> getSourcePkColNames() { + return getIndexPkColNames(); + } + + @Override + public String getQualifiedSourceTableName() { + return getQualifiedIndexTableName(); + } + + @Override + public String getQualifiedTargetTableName() { + return getQualifiedDataTableName(); + } + + @Override + public List<String> getCastedTargetColNames() { + return getCastedColumnNames(getDataColNames(), indexColSqlTypeNames); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java new file mode 100644 index 0000000..5daa1ed --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/IndexColumnNames.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import java.sql.Types; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.SchemaUtil; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** + * Gets index column names and their data table equivalents + */ +public class IndexColumnNames { + private List<String> dataNonPkColNames = Lists.newArrayList(); + private List<String> dataPkColNames = Lists.newArrayList(); + private List<String> dataColNames; + protected List<String> dataColSqlTypeNames = Lists.newArrayList(); + private List<String> indexPkColNames = Lists.newArrayList(); + private List<String> indexNonPkColNames = Lists.newArrayList(); + private List<String> indexColNames; + protected List<String> indexColSqlTypeNames = Lists.newArrayList(); + private PTable pdataTable; + private PTable pindexTable; + + public IndexColumnNames(final PTable pdataTable, final PTable pindexTable) { + this.pdataTable = pdataTable; + this.pindexTable = pindexTable; + List<PColumn> pindexCols = pindexTable.getColumns(); + Set<String> indexColsAdded = new HashSet<String>(); + + // first add the data pk columns + for (PColumn indexCol : pindexCols) { + if (IndexUtil.isDataPKColumn(indexCol)) { + String indexColumnName = indexCol.getName().getString(); + PColumn dPkCol = IndexUtil.getDataColumn(pdataTable, indexColumnName); + dataPkColNames.add(getDataColFullName(dPkCol)); + dataColSqlTypeNames.add(getDataTypeString(dPkCol)); + indexPkColNames.add(indexColumnName); + indexColSqlTypeNames.add(getDataTypeString(indexCol)); + indexColsAdded.add(indexColumnName); + } + } + + // then the rest of the index pk columns + for (PColumn indexPkCol : pindexTable.getPKColumns()) { + String indexColName = indexPkCol.getName().getString(); + if (!indexColsAdded.contains(indexColName)) { + indexPkColNames.add(indexColName); + indexColSqlTypeNames.add(getDataTypeString(indexPkCol)); + PColumn dCol = IndexUtil.getDataColumn(pdataTable, indexColName); + dataNonPkColNames.add(getDataColFullName(dCol)); + dataColSqlTypeNames.add(getDataTypeString(dCol)); + indexColsAdded.add(indexColName); + } + } + + // then the covered columns (rest of the columns) + for (PColumn indexCol : pindexTable.getColumns()) { + String indexColName = indexCol.getName().getString(); + if (!indexColsAdded.contains(indexColName)) { + indexNonPkColNames.add(indexColName); + indexColSqlTypeNames.add(getDataTypeString(indexCol)); + PColumn dCol = IndexUtil.getDataColumn(pdataTable, indexColName); + dataNonPkColNames.add(getDataColFullName(dCol)); + dataColSqlTypeNames.add(getDataTypeString(dCol)); + } + } + + indexColNames = Lists.newArrayList(Iterables.concat(indexPkColNames, indexNonPkColNames)); + dataColNames = Lists.newArrayList(Iterables.concat(dataPkColNames, dataNonPkColNames)); + } + + private String getDataTypeString(PColumn col) { + PDataType<?> dataType = col.getDataType(); + switch (dataType.getSqlType()) { + case Types.DECIMAL: + String typeStr = dataType.toString(); + if (col.getMaxLength() != null) { + typeStr += "(" + col.getMaxLength().toString(); + if (col.getScale() != null) { + typeStr += "," + col.getScale().toString(); + } + typeStr += ")"; + } + return typeStr; + default: + if (col.getMaxLength() != null) { + return String.format("%s(%s)", dataType.toString(), col.getMaxLength()); + } + return dataType.toString(); + } + } + + private String getDataColFullName(PColumn dCol) { + String dColFullName = ""; + if (dCol.getFamilyName() != null) { + dColFullName += dCol.getFamilyName().getString() + QueryConstants.NAME_SEPARATOR; + } + dColFullName += dCol.getName().getString(); + return dColFullName; + } + + private List<String> getDynamicCols(List<String> colNames, List<String> colTypes) { + List<String> dynamicCols = Lists.newArrayListWithCapacity(colNames.size()); + for (int i = 0; i < colNames.size(); i++) { + String dataColName = colNames.get(i); + String dataColType = colTypes.get(i); + String dynamicCol = + SchemaUtil.getEscapedFullColumnName(dataColName) + " " + dataColType; + dynamicCols.add(dynamicCol); + } + return dynamicCols; + } + + private List<String> getUnqualifiedColNames(List<String> qualifiedCols) { + return Lists.transform(qualifiedCols, new Function<String, String>() { + @Override + public String apply(String qCol) { + return SchemaUtil.getTableNameFromFullName(qCol, QueryConstants.NAME_SEPARATOR); + } + }); + } + + protected List<String> getCastedColumnNames(List<String> colNames, List<String> castTypes) { + List<String> castColNames = Lists.newArrayListWithCapacity(colNames.size()); + colNames = SchemaUtil.getEscapedFullColumnNames(colNames); + for (int i = 0; i < colNames.size(); i++) { + castColNames.add("CAST(" + colNames.get(i) + " AS " + castTypes.get(i) + ")"); + } + return castColNames; + } + + public String getQualifiedDataTableName() { + return SchemaUtil.getQualifiedTableName(pdataTable.getSchemaName().getString(), + pdataTable.getTableName().getString()); + } + + public String getQualifiedIndexTableName() { + return SchemaUtil.getQualifiedTableName(pindexTable.getSchemaName().getString(), + pindexTable.getTableName().getString()); + } + + /** + * @return the escaped data column names (equivalents for the index columns) along with their + * sql type, for use in dynamic column queries/upserts + */ + public List<String> getDynamicDataCols() { + // don't want the column family for dynamic columns + return getDynamicCols(getUnqualifiedDataColNames(), dataColSqlTypeNames); + + } + + /** + * @return the escaped index column names along with their sql type, for use in dynamic column + * queries/upserts + */ + public List<String> getDynamicIndexCols() { + // don't want the column family for dynamic columns + return getDynamicCols(getUnqualifiedIndexColNames(), indexColSqlTypeNames); + } + + /** + * @return the corresponding data table column names for the index columns, leading with the + * data table pk columns + */ + public List<String> getDataColNames() { + return dataColNames; + } + + /** + * @return same as getDataColNames, without the column family qualifier + */ + public List<String> getUnqualifiedDataColNames() { + return getUnqualifiedColNames(dataColNames); + } + + /** + * @return the corresponding data table column names for the index columns, which are not part + * of the data table pk + */ + public List<String> getDataNonPkColNames() { + return dataNonPkColNames; + } + + /** + * @return the corresponding data table column names for the index columns, which are part of + * the data table pk + */ + public List<String> getDataPkColNames() { + return dataPkColNames; + } + + /** + * @return the index column names, leading with the data table pk columns + */ + public List<String> getIndexColNames() { + return indexColNames; + } + + /** + * @return same as getIndexColNames, without the column family qualifier + */ + public List<String> getUnqualifiedIndexColNames() { + return getUnqualifiedColNames(indexColNames); + } + + /** + * @return the index pk column names + */ + public List<String> getIndexPkColNames() { + return indexPkColNames; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index 1302f85..db11f7d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -41,6 +41,8 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper; import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor; import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.OutputFormat; +import org.apache.phoenix.mapreduce.index.IndexScrutinyTool.SourceTable; import org.apache.phoenix.util.ColumnInfo; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.QueryUtil; @@ -103,7 +105,29 @@ public final class PhoenixConfigurationUtil { public static final String INDEX_DISABLED_TIMESTAMP_VALUE = "phoenix.mr.index.disableTimestamp"; public static final String INDEX_MAINTAINERS = "phoenix.mr.index.maintainers"; - + + public static final String SCRUTINY_DATA_TABLE_NAME = "phoenix.mr.scrutiny.data.table.name"; + + public static final String SCRUTINY_INDEX_TABLE_NAME = "phoenix.mr.scrutiny.index.table.name"; + + public static final String SCRUTINY_SOURCE_TABLE = "phoenix.mr.scrutiny.source.table"; + + public static final String SCRUTINY_BATCH_SIZE = "phoenix.mr.scrutiny.batch.size"; + + public static final String SCRUTINY_OUTPUT_INVALID_ROWS = + "phoenix.mr.scrutiny.output.invalid.rows"; + + public static final boolean DEFAULT_SCRUTINY_OUTPUT_INVALID_ROWS = false; + + public static final String SCRUTINY_OUTPUT_FORMAT = "phoenix.mr.scrutiny.output.format"; + + public static final String SCRUTINY_EXECUTE_TIMESTAMP = "phoenix.mr.scrutiny.execute.timestamp"; + + // max output rows per mapper + public static final String SCRUTINY_OUTPUT_MAX = "phoenix.mr.scrutiny.output.max"; + + public static final long DEFAULT_SCRUTINY_BATCH_SIZE = 1000; + public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes"; // Generate splits based on scans from stats, or just from region splits @@ -295,6 +319,12 @@ public final class PhoenixConfigurationUtil { return upsertStmt; } + + public static void setUpsertStatement(final Configuration configuration, String upsertStmt) throws SQLException { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(upsertStmt); + configuration.set(UPSERT_STATEMENT, upsertStmt); + } public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException { Preconditions.checkNotNull(configuration); @@ -475,7 +505,101 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(indexName); configuration.set(DISABLED_INDEXES, indexName); } - + + public static String getScrutinyDataTableName(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(SCRUTINY_DATA_TABLE_NAME); + } + + public static void setScrutinyDataTable(Configuration configuration, String qDataTableName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(qDataTableName); + configuration.set(SCRUTINY_DATA_TABLE_NAME, qDataTableName); + } + + public static String getScrutinyIndexTableName(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(SCRUTINY_INDEX_TABLE_NAME); + } + + public static void setScrutinyIndexTable(Configuration configuration, String qIndexTableName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(qIndexTableName); + configuration.set(SCRUTINY_INDEX_TABLE_NAME, qIndexTableName); + } + + public static SourceTable getScrutinySourceTable(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return SourceTable.valueOf(configuration.get(SCRUTINY_SOURCE_TABLE)); + } + + public static void setScrutinySourceTable(Configuration configuration, + SourceTable sourceTable) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(sourceTable); + configuration.set(SCRUTINY_SOURCE_TABLE, sourceTable.name()); + } + + public static boolean getScrutinyOutputInvalidRows(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.getBoolean(SCRUTINY_OUTPUT_INVALID_ROWS, + DEFAULT_SCRUTINY_OUTPUT_INVALID_ROWS); + } + + public static void setScrutinyOutputInvalidRows(Configuration configuration, + boolean outputInvalidRows) { + Preconditions.checkNotNull(configuration); + configuration.setBoolean(SCRUTINY_OUTPUT_INVALID_ROWS, outputInvalidRows); + } + + public static long getScrutinyBatchSize(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.getLong(SCRUTINY_BATCH_SIZE, DEFAULT_SCRUTINY_BATCH_SIZE); + } + + public static void setScrutinyBatchSize(Configuration configuration, long batchSize) { + Preconditions.checkNotNull(configuration); + configuration.setLong(SCRUTINY_BATCH_SIZE, batchSize); + } + + public static OutputFormat getScrutinyOutputFormat(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return OutputFormat + .valueOf(configuration.get(SCRUTINY_OUTPUT_FORMAT, OutputFormat.FILE.name())); + } + + public static void setScrutinyOutputFormat(Configuration configuration, + OutputFormat outputFormat) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(outputFormat); + configuration.set(SCRUTINY_OUTPUT_FORMAT, outputFormat.name()); + } + + public static long getScrutinyExecuteTimestamp(Configuration configuration) { + Preconditions.checkNotNull(configuration); + long ts = configuration.getLong(SCRUTINY_EXECUTE_TIMESTAMP, -1); + Preconditions.checkArgument(ts != -1); + return ts; + } + + public static void setScrutinyOutputMax(Configuration configuration, + long outputMaxRows) { + Preconditions.checkNotNull(configuration); + configuration.setLong(SCRUTINY_OUTPUT_MAX, outputMaxRows); + } + + public static long getScrutinyOutputMax(Configuration configuration) { + Preconditions.checkNotNull(configuration); + long maxRows = configuration.getLong(SCRUTINY_OUTPUT_MAX, -1); + Preconditions.checkArgument(maxRows != -1); + return maxRows; + } + + public static void setScrutinyExecuteTimestamp(Configuration configuration, long ts) { + Preconditions.checkNotNull(configuration); + configuration.setLong(SCRUTINY_EXECUTE_TIMESTAMP, ts); + } + public static String getDisableIndexes(Configuration configuration) { Preconditions.checkNotNull(configuration); return configuration.get(DISABLED_INDEXES); http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java index 7961453..d7154a1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java @@ -30,6 +30,7 @@ import java.util.Properties; import javax.annotation.Nullable; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -203,30 +204,74 @@ public final class QueryUtil { * @return Select Query */ public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos,final String conditions) { - Preconditions.checkNotNull(fullTableName,"Table name cannot be null"); - if(columnInfos == null || columnInfos.isEmpty()) { - throw new IllegalArgumentException("At least one column must be provided"); + List<String> columns = Lists.transform(columnInfos, new Function<ColumnInfo, String>(){ + @Override + public String apply(ColumnInfo input) { + return input.getColumnName(); + }}); + return constructSelectStatement(fullTableName, columns , conditions, null, false); + } + + /** + * + * @param fullTableName name of the table for which the select statement needs to be created. + * @param columns list of columns to be projected in the select statement. + * @param conditions The condition clause to be added to the WHERE condition + * @param hint hint to use + * @param escapeCols whether to escape the projected columns + * @return Select Query + */ + public static String constructSelectStatement(String fullTableName, List<String> columns, + final String conditions, Hint hint, boolean escapeCols) { + Preconditions.checkNotNull(fullTableName, "Table name cannot be null"); + if (columns == null || columns.isEmpty()) { + throw new IllegalArgumentException("At least one column must be provided"); } StringBuilder query = new StringBuilder(); query.append("SELECT "); - for (ColumnInfo cinfo : columnInfos) { - if (cinfo != null) { - String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName()); + + String hintStr = ""; + if (hint != null) { + final HintNode node = new HintNode(hint.name()); + hintStr = node.toString(); + } + query.append(hintStr); + + for (String col : columns) { + if (col != null) { + String fullColumnName = col; + if (escapeCols) { + fullColumnName = getEscapedFullColumnName(col); + } query.append(fullColumnName); query.append(","); - } - } + } + } // Remove the trailing comma query.setLength(query.length() - 1); query.append(" FROM "); query.append(fullTableName); - if(conditions != null && conditions.length() > 0) { + if (conditions != null && conditions.length() > 0) { query.append(" WHERE (").append(conditions).append(")"); } return query.toString(); } /** + * Constructs parameterized filter for an IN clause e.g. passing in numWhereCols=2, numBatches=3 + * results in ((?,?),(?,?),(?,?)) + * @param numWhereCols number of WHERE columns + * @param numBatches number of column batches + * @return paramterized IN filter + */ + public static String constructParameterizedInClause(int numWhereCols, int numBatches) { + Preconditions.checkArgument(numWhereCols > 0); + Preconditions.checkArgument(numBatches > 0); + String batch = "(" + StringUtils.repeat("?", ",", numWhereCols) + ")"; + return "(" + StringUtils.repeat(batch, ",", numBatches) + ")"; + } + + /** * Create the Phoenix JDBC connection URL from the provided cluster connection details. */ public static String getUrl(String zkQuorum) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java index d54670c..51f6ff9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -80,7 +80,9 @@ import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.schema.types.PVarchar; +import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -335,6 +337,15 @@ public class SchemaUtil { return getName(familyName, columnName, false); } + public static List<String> getColumnNames(List<PColumn> pCols) { + return Lists.transform(pCols, new Function<PColumn, String>() { + @Override + public String apply(PColumn input) { + return input.getName().getString(); + } + }); + } + public static byte[] getTableNameAsBytes(String schemaName, String tableName) { if (schemaName == null || schemaName.length() == 0) { return StringUtil.toBytes(tableName); @@ -767,6 +778,16 @@ public class SchemaUtil { return getEscapedArgument(columnFamily) + QueryConstants.NAME_SEPARATOR + getEscapedArgument(columnName) ; } + public static List<String> getEscapedFullColumnNames(List<String> fullColumnNames) { + return Lists + .newArrayList(Iterables.transform(fullColumnNames, new Function<String, String>() { + @Override + public String apply(String col) { + return getEscapedFullColumnName(col); + } + })); + } + public static String getEscapedFullTableName(String fullTableName) { final String schemaName = getSchemaNameFromFullName(fullTableName); final String tableName = getTableNameFromFullName(fullTableName); http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/BaseIndexTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/BaseIndexTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/BaseIndexTest.java new file mode 100644 index 0000000..4ec4a0c --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/BaseIndexTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.Properties; + +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; + +/** + * + * Creates a simple data table and index table + * + */ +public class BaseIndexTest extends BaseConnectionlessQueryTest { + protected static final String SCHEMA_NAME = "TEST_SCHEMA"; + protected static final String DATA_TABLE_NAME = "TEST_INDEX_COLUMN_NAMES_UTIL"; + protected static final String INDEX_TABLE_NAME = "TEST_ICN_INDEX"; + protected static final String DATA_TABLE_FULL_NAME = SCHEMA_NAME + "." + DATA_TABLE_NAME; + protected static final String INDEX_TABLE_FULL_NAME = SCHEMA_NAME + "." + INDEX_TABLE_NAME; + + private static final String DATA_TABLE_DDL = + "CREATE TABLE IF NOT EXISTS " + DATA_TABLE_FULL_NAME + "\n" + + "(\n" + + " ID INTEGER NOT NULL,\n" + + " PK_PART2 TINYINT NOT NULL,\n" + + " NAME VARCHAR,\n" + + " ZIP BIGINT,\n" + + " EMPLOYER CHAR(20),\n" + + " CONSTRAINT PK PRIMARY KEY\n" + + " (\n" + + " ID,\n" + + " PK_PART2\n" + + " \n" + + " )\n" + + ")"; + + private static final String INDEX_TABLE_DDL = + "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + + " (NAME) INCLUDE (ZIP)"; + protected PTable pDataTable; + protected PTable pIndexTable; + protected Connection conn; + + @BeforeClass + public static void setupClass() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + Connection conn = DriverManager.getConnection(getUrl(), props); + try { + conn.setAutoCommit(true); + conn.createStatement().execute(DATA_TABLE_DDL); + conn.createStatement().execute(INDEX_TABLE_DDL); + } finally { + conn.close(); + } + } + + @Before + public void setup() throws Exception { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + conn = DriverManager.getConnection(getUrl(), props); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), DATA_TABLE_FULL_NAME)); + pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), INDEX_TABLE_FULL_NAME)); + } + + @After + public void tearDown() throws Exception { + if (conn != null) { + conn.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java new file mode 100644 index 0000000..c6bb739 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTableOutputTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.phoenix.mapreduce.index; + +import static org.junit.Assert.assertEquals; + +import java.sql.SQLException; +import java.util.Arrays; + +import org.apache.phoenix.mapreduce.util.IndexColumnNames; +import org.junit.Before; +import org.junit.Test; + +public class IndexScrutinyTableOutputTest extends BaseIndexTest { + + private static final long SCRUTINY_TIME_MILLIS = 1502908914193L; + + @Before + public void setup() throws Exception { + super.setup(); + conn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_TABLE_DDL); + conn.createStatement().execute(IndexScrutinyTableOutput.OUTPUT_METADATA_DDL); + } + + @Test + public void testConstructMetadataParamQuery() { + String metadataParamQuery = + IndexScrutinyTableOutput + .constructMetadataParamQuery(Arrays.asList("INVALID_ROWS_QUERY_ALL")); + assertEquals( + "SELECT \"INVALID_ROWS_QUERY_ALL\" FROM PHOENIX_INDEX_SCRUTINY_METADATA WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN ((?,?,?))", + metadataParamQuery); + } + + @Test + public void testGetSqlQueryAllInvalidRows() throws SQLException { + SourceTargetColumnNames columnNames = + new SourceTargetColumnNames.DataSourceColNames(pDataTable, pIndexTable); + String sqlStr = + IndexScrutinyTableOutput.getSqlQueryAllInvalidRows(conn, columnNames, + SCRUTINY_TIME_MILLIS); + assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193))", + sqlStr); + } + + @Test + public void testGetSqlQueryMissingTargetRows() throws SQLException { + SourceTargetColumnNames columnNames = + new SourceTargetColumnNames.DataSourceColNames(pDataTable, pIndexTable); + String query = + IndexScrutinyTableOutput.getSqlQueryMissingTargetRows(conn, columnNames, + SCRUTINY_TIME_MILLIS); + assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,false))", + query); + } + + @Test + public void testGetSqlQueryBadCoveredColVal() throws SQLException { + SourceTargetColumnNames columnNames = + new SourceTargetColumnNames.DataSourceColNames(pDataTable, pIndexTable); + String query = + IndexScrutinyTableOutput.getSqlQueryBadCoveredColVal(conn, columnNames, + SCRUTINY_TIME_MILLIS); + assertEquals("SELECT \"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\",\"SOURCE_ROW_PK_HASH\",\"SOURCE_TS\",\"TARGET_TS\",\"HAS_TARGET_ROW\",\"ID\",\"PK_PART2\",\"NAME\",\"ZIP\",\":ID\",\":PK_PART2\",\"0:NAME\",\"0:ZIP\" FROM PHOENIX_INDEX_SCRUTINY(\"ID\" INTEGER,\"PK_PART2\" TINYINT,\"NAME\" VARCHAR,\"ZIP\" BIGINT,\":ID\" INTEGER,\":PK_PART2\" TINYINT,\"0:NAME\" VARCHAR,\"0:ZIP\" BIGINT) WHERE (\"SOURCE_TABLE\",\"TARGET_TABLE\",\"SCRUTINY_EXECUTE_TIME\", \"HAS_TARGET_ROW\") IN (('TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL','TEST_SCHEMA.TEST_ICN_INDEX',1502908914193,true))", + query); + } + + @Test + public void testGetOutputTableUpsert() throws Exception { + IndexColumnNames columnNames = new IndexColumnNames(pDataTable, pIndexTable); + String outputTableUpsert = + IndexScrutinyTableOutput.constructOutputTableUpsert( + columnNames.getDynamicDataCols(), columnNames.getDynamicIndexCols(), conn); + conn.prepareStatement(outputTableUpsert); // shouldn't throw + assertEquals("UPSERT INTO PHOENIX_INDEX_SCRUTINY (\"SOURCE_TABLE\", \"TARGET_TABLE\", \"SCRUTINY_EXECUTE_TIME\", \"SOURCE_ROW_PK_HASH\", \"SOURCE_TS\", \"TARGET_TS\", \"HAS_TARGET_ROW\", \"ID\" INTEGER, \"PK_PART2\" TINYINT, \"NAME\" VARCHAR, \"ZIP\" BIGINT, \":ID\" INTEGER, \":PK_PART2\" TINYINT, \"0:NAME\" VARCHAR, \"0:ZIP\" BIGINT) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + outputTableUpsert); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/IndexColumnNamesTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/IndexColumnNamesTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/IndexColumnNamesTest.java new file mode 100644 index 0000000..48c688f --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/IndexColumnNamesTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import static org.junit.Assert.assertEquals; + +import java.sql.SQLException; + +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.index.BaseIndexTest; +import org.apache.phoenix.parse.HintNode.Hint; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.QueryUtil; +import org.junit.Test; + +public class IndexColumnNamesTest extends BaseIndexTest { + + private static final String DYNAMIC_COL_DDL = + "CREATE TABLE IF NOT EXISTS PRECISION_NAME_TEST\n" + "(\n" + + " CHAR_TEST CHAR(15) NOT NULL primary key,\n" + + " VARCHAR_TEST VARCHAR(1),\n" + " DECIMAL_TEST DECIMAL(10,2),\n" + + " BINARY_TEST BINARY(11),\n" + + " VARCHAR_UNSPEC VARCHAR,\n" + + " DEC_UNSPEC DECIMAL\n" + ")"; + + private static final String DYNAMIC_COL_IDX_DDL = + "CREATE INDEX PRECISION_NAME_IDX_TEST ON PRECISION_NAME_TEST(VARCHAR_TEST) INCLUDE (CHAR_TEST,DECIMAL_TEST,BINARY_TEST,VARCHAR_UNSPEC,DEC_UNSPEC)"; + + @Test + public void testGetColumnNames() { + IndexColumnNames indexColumnNames = new IndexColumnNames(pDataTable, pIndexTable); + assertEquals("[ID, PK_PART2, 0.NAME, 0.ZIP]", indexColumnNames.getDataColNames().toString()); + assertEquals("[:ID, :PK_PART2, 0:NAME, 0:ZIP]", indexColumnNames.getIndexColNames().toString()); //index column names, leading with the data table pk + assertEquals("[:ID, :PK_PART2, 0:NAME]", indexColumnNames.getIndexPkColNames().toString()); + assertEquals("[ID, PK_PART2]", indexColumnNames.getDataPkColNames().toString()); + assertEquals("[0.NAME, 0.ZIP]", indexColumnNames.getDataNonPkColNames().toString()); + + assertEquals("[\"ID\" INTEGER, \"PK_PART2\" TINYINT, \"NAME\" VARCHAR, \"ZIP\" BIGINT]", indexColumnNames.getDynamicDataCols().toString()); + assertEquals("[\":ID\" INTEGER, \":PK_PART2\" TINYINT, \"0:NAME\" VARCHAR, \"0:ZIP\" BIGINT]", indexColumnNames.getDynamicIndexCols().toString()); + assertEquals("UPSERT /*+ NO_INDEX */ INTO TEST_SCHEMA.TEST_INDEX_COLUMN_NAMES_UTIL (\"ID\" INTEGER, \"PK_PART2\" TINYINT, \"NAME\" VARCHAR, \"ZIP\" BIGINT) VALUES (?, ?, ?, ?)", QueryUtil.constructUpsertStatement(DATA_TABLE_FULL_NAME, indexColumnNames.getDynamicDataCols(), Hint.NO_INDEX)); + } + + /** + * Tests that col types with a precision are outputted correctly in the dynamic columns + * @throws SQLException + */ + @Test + public void testGetDynamicColPrecision() throws SQLException { + conn.createStatement().execute(DYNAMIC_COL_DDL); + conn.createStatement().execute(DYNAMIC_COL_IDX_DDL); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + pDataTable = pconn.getTable(new PTableKey(pconn.getTenantId(), "PRECISION_NAME_TEST")); + pIndexTable = pconn.getTable(new PTableKey(pconn.getTenantId(), "PRECISION_NAME_IDX_TEST")); + IndexColumnNames indexColumnNames = new IndexColumnNames(pDataTable, pIndexTable); + assertEquals("[\"CHAR_TEST\" CHAR(15), \"VARCHAR_TEST\" VARCHAR(1), \"DECIMAL_TEST\" DECIMAL(10,2), \"BINARY_TEST\" BINARY(11), \"VARCHAR_UNSPEC\" VARCHAR, \"DEC_UNSPEC\" DECIMAL]", indexColumnNames.getDynamicDataCols().toString()); + assertEquals("[\":CHAR_TEST\" CHAR(15), \"0:VARCHAR_TEST\" VARCHAR(1), \"0:DECIMAL_TEST\" DECIMAL(10,2), \"0:BINARY_TEST\" BINARY(11), \"0:VARCHAR_UNSPEC\" VARCHAR, \"0:DEC_UNSPEC\" DECIMAL]", + indexColumnNames.getDynamicIndexCols().toString()); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/124404ae/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java index 45f536d..2d094f6 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java @@ -27,9 +27,11 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.parse.HintNode.Hint; import org.junit.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; public class QueryUtilTest { @@ -94,7 +96,21 @@ public class QueryUtilTest { "SELECT \"ID\",\"NAME\" FROM \"a\".\"mytab\"", QueryUtil.constructSelectStatement(fullTableName, ImmutableList.of(ID_COLUMN,NAME_COLUMN),null)); } - + + @Test + public void testConstructSelectWithHint() { + assertEquals( + "SELECT /*+ NO_INDEX */ \"col1\",\"col2\" FROM MYTAB WHERE (\"col2\"=? and \"col3\" is null)", + QueryUtil.constructSelectStatement("MYTAB", Lists.newArrayList("col1", "col2"), + "\"col2\"=? and \"col3\" is null", Hint.NO_INDEX, true)); + } + + @Test + public void testConstructParameterizedInClause() { + assertEquals("((?,?,?),(?,?,?))", QueryUtil.constructParameterizedInClause(3, 2)); + assertEquals("((?))", QueryUtil.constructParameterizedInClause(1, 1)); + } + /** * Test that we create connection strings from the HBase Configuration that match the * expected syntax. Expected to log exceptions as it uses ZK host names that don't exist