Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 bb23c5043 -> 4691d8fbd
Fix PhoenixMRJobSubmitter submits duplicate MR jobs for an index build from indexTool Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4691d8fb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4691d8fb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4691d8fb Branch: refs/heads/4.x-HBase-1.2 Commit: 4691d8fbd884ad71068f792b17071f135205ed1c Parents: bb23c50 Author: Xu Cang <xc...@salesforce.com> Authored: Mon May 7 16:21:46 2018 -0700 Committer: Thomas D'Silva <tdsi...@apache.org> Committed: Thu May 10 15:37:35 2018 -0700 ---------------------------------------------------------------------- .../phoenix/mapreduce/index/IndexTool.java | 6 ++-- .../index/automation/PhoenixAsyncIndex.java | 2 +- .../index/automation/PhoenixMRJobSubmitter.java | 2 +- .../index/automated/MRJobSubmitterTest.java | 30 +++++++++++++++----- 4 files changed, 28 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/4691d8fb/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 671e4cf..e3aa729 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -123,7 +123,7 @@ public class IndexTool extends Configured implements Tool { private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false, "If specified, uses Snapshots for async index building (optional)"); private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); - public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s"; + public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s.%s_INDX_%s"; private Options getOptions() { final Options options = new Options(); @@ -373,9 +373,9 @@ public class IndexTool extends Configured implements Tool { PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns); ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); fs = outputPath.getFileSystem(configuration); - fs.delete(outputPath, true); + fs.delete(outputPath, true); - final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, pdataTable.getName().toString(), indexTable); + final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); final Job job = Job.getInstance(configuration, jobName); job.setJarByClass(IndexTool.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); http://git-wip-us.apache.org/repos/asf/phoenix/blob/4691d8fb/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java index 3e88cd0..a61e49a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixAsyncIndex.java @@ -59,7 +59,7 @@ public class PhoenixAsyncIndex { } public String getJobName() { - return String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, dataTableName, tableName); + return String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, tableSchem, dataTableName, tableName); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/4691d8fb/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java index d86802a..3e20bd2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/automation/PhoenixMRJobSubmitter.java @@ -216,7 +216,7 @@ public class PhoenixMRJobSubmitter { indexInfo.setTableSchem(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM)); indexInfo.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME)); candidateIndexes.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, - indexInfo.getDataTableName(), indexInfo.getTableName()), indexInfo); + indexInfo.getTableSchem(), indexInfo.getDataTableName(), indexInfo.getTableName()), indexInfo); } return candidateIndexes; http://git-wip-us.apache.org/repos/asf/phoenix/blob/4691d8fb/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java index 5ed1837..3a4de4c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/automated/MRJobSubmitterTest.java @@ -43,18 +43,20 @@ public class MRJobSubmitterTest { PhoenixAsyncIndex index1 = new PhoenixAsyncIndex(); index1.setDataTableName("DT1"); index1.setTableName("IT1"); + index1.setTableSchem("NEW_SCHEM1"); index1.setIndexType(IndexType.LOCAL); candidateJobs.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, - index1.getDataTableName(), index1.getTableName()), index1); + index1.getTableSchem(), index1.getDataTableName(), index1.getTableName()), index1); PhoenixAsyncIndex index2 = new PhoenixAsyncIndex(); index2.setDataTableName("DT2"); index2.setTableName("IT2"); + index2.setTableSchem("NEW_SCHEM2"); index2.setIndexType(IndexType.LOCAL); candidateJobs.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, - index2.getDataTableName(), index2.getTableName()), index2); + index2.getTableSchem(), index2.getDataTableName(), index2.getTableName()), index2); } @Test @@ -71,6 +73,20 @@ public class MRJobSubmitterTest { } @Test + public void testIndexJobsName() throws IOException { + // Verify index job name contains schem name, not only table name. + PhoenixAsyncIndex index = new PhoenixAsyncIndex(); + index.setDataTableName("MyDataTable"); + index.setTableName("MyTableName"); + index.setTableSchem("MySchem"); + index.setIndexType(IndexType.LOCAL); + + String jobName = String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, + index.getTableSchem(), index.getDataTableName(), index.getTableName()); + assertEquals("PHOENIX_MySchem.MyDataTable_INDX_MyTableName", jobName); + } + + @Test public void testGlobalIndexJobsForSubmission() throws IOException { // Set the index type to GLOBAL @@ -91,7 +107,7 @@ public class MRJobSubmitterTest { // Mark one job as running submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, - jobs[0].getDataTableName(), jobs[0].getTableName())); + jobs[0].getTableSchem(), jobs[0].getDataTableName(), jobs[0].getTableName())); PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter(); Set<PhoenixAsyncIndex> jobsToSubmit = @@ -110,9 +126,9 @@ public class MRJobSubmitterTest { // Mark all the candidate jobs as running/in-progress submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, - jobs[0].getDataTableName(), jobs[0].getTableName())); + jobs[0].getTableSchem(), jobs[0].getDataTableName(), jobs[0].getTableName())); submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, - jobs[1].getDataTableName(), jobs[1].getTableName())); + jobs[1].getTableSchem(), jobs[1].getDataTableName(), jobs[1].getTableName())); PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter(); Set<PhoenixAsyncIndex> jobsToSubmit = @@ -126,9 +142,9 @@ public class MRJobSubmitterTest { candidateJobs.clear(); // Add some dummy running jobs to the submitted list submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, - "d1", "i1")); + "s1", "d1", "i1")); submittedJobs.add(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE, - "d2", "i2")); + "s2", "d2", "i2")); PhoenixMRJobSubmitter submitter = new PhoenixMRJobSubmitter(); Set<PhoenixAsyncIndex> jobsToSubmit = submitter.getJobsToSubmit(candidateJobs, submittedJobs);