PHOENIX-1248 CsvBulkLoadTool is failing with IAE when local index specified for --index-table parameter(Gabriel Reid)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d6e7846f Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d6e7846f Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d6e7846f Branch: refs/heads/calcite Commit: d6e7846f491e09780a2d663cc5c23bc21244e26c Parents: 3f48938 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Sun Feb 1 09:28:29 2015 -0800 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Sun Feb 1 09:28:29 2015 -0800 ---------------------------------------------------------------------- .../phoenix/mapreduce/CsvBulkLoadToolIT.java | 79 ++++++++++++++---- .../phoenix/mapreduce/CsvBulkLoadTool.java | 85 ++++++++++++++------ .../phoenix/mapreduce/CsvToKeyValueMapper.java | 3 +- 3 files changed, 126 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java index 4373f47..0501142 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java @@ -17,12 +17,6 @@ */ package org.apache.phoenix.mapreduce; -import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - import java.io.PrintWriter; import java.sql.Connection; import java.sql.DriverManager; @@ -42,6 +36,12 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + @Category(NeedsOwnMiniClusterTest.class) public class CsvBulkLoadToolIT { @@ -191,17 +191,62 @@ public class CsvBulkLoadToolIT { rs.close(); stmt.close(); } - + @Test - public void testImportOneIndexTable() throws Exception { + public void testImportWithLocalIndex() throws Exception { Statement stmt = conn.createStatement(); - stmt.execute("CREATE TABLE TABLE4 (ID INTEGER NOT NULL PRIMARY KEY, " + - "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); - String ddl = "CREATE INDEX TABLE4_IDX ON TABLE4 " + stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 " + " (FIRST_NAME ASC)"; stmt.execute(ddl); - + + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool(); + csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); + int exitCode = csvBulkLoadTool.run(new String[] { + "--input", "/tmp/input3.csv", + "--table", "table6", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("FirstName 2", rs.getString(2)); + + rs.close(); + stmt.close(); + } + + @Test + public void testImportOneIndexTable() throws Exception { + testImportOneIndexTable("TABLE4", false); + } + + @Test + public void testImportOneLocalIndexTable() throws Exception { + testImportOneIndexTable("TABLE5", true); + } + + public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception { + + String indexTableName = String.format("%s_IDX", tableName); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = + "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON " + + tableName + "(FIRST_NAME ASC)"; + stmt.execute(ddl); + FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration()); FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv")); PrintWriter printWriter = new PrintWriter(outputStream); @@ -213,14 +258,14 @@ public class CsvBulkLoadToolIT { csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration()); int exitCode = csvBulkLoadTool.run(new String[] { "--input", "/tmp/input4.csv", - "--table", "table4", - "--index-table", "TABLE4_IDX", - "--zookeeper", zkQuorum}); + "--table", tableName, + "--index-table", indexTableName, + "--zookeeper", zkQuorum }); assertEquals(0, exitCode); - ResultSet rs = stmt.executeQuery("SELECT * FROM TABLE4"); + ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); assertFalse(rs.next()); - rs = stmt.executeQuery("SELECT FIRST_NAME FROM TABLE4 where FIRST_NAME='FirstName 1'"); + rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'"); assertTrue(rs.next()); assertEquals("FirstName 1", rs.getString(1)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java index 54e3f2c..c92a3a3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java @@ -61,8 +61,10 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.job.JobManager; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.util.CSVCommonsLoader; import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.StringUtil; @@ -212,38 +214,39 @@ public class CsvBulkLoadTool extends Configured implements Tool { outputPath = new Path("/tmp/" + UUID.randomUUID()); } - List<String> tablesToBeLoaded = new ArrayList<String>(); - tablesToBeLoaded.add(qualifiedTableName); + List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>(); + tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName)); tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName)); // When loading a single index table, check index table name is correct if(qualifedIndexTableName != null){ - boolean exists = false; - for(String tmpTable : tablesToBeLoaded){ - if(tmpTable.compareToIgnoreCase(qualifedIndexTableName) == 0) { - exists = true; + TargetTableRef targetIndexRef = null; + for (TargetTableRef tmpTable : tablesToBeLoaded){ + if(tmpTable.getLogicalName().compareToIgnoreCase(qualifedIndexTableName) == 0) { + targetIndexRef = tmpTable; break; } } - if(!exists){ + if(targetIndexRef == null){ throw new IllegalStateException("CSV Bulk Loader error: index table " + qualifedIndexTableName + " doesn't exist"); } tablesToBeLoaded.clear(); - tablesToBeLoaded.add(qualifedIndexTableName); + tablesToBeLoaded.add(targetIndexRef); } List<Future<Boolean>> runningJobs = new ArrayList<Future<Boolean>>(); ExecutorService executor = JobManager.createThreadPoolExec(Integer.MAX_VALUE, 5, 20); try{ - for(String table : tablesToBeLoaded) { - Path tablePath = new Path(outputPath, table); + for (TargetTableRef table : tablesToBeLoaded) { + Path tablePath = new Path(outputPath, table.getPhysicalName()); Configuration jobConf = new Configuration(conf); jobConf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName); - if(qualifiedTableName.compareToIgnoreCase(table) != 0) { - jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table); + if(qualifiedTableName.compareToIgnoreCase(table.getLogicalName()) != 0) { + jobConf.set(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, table.getPhysicalName()); } - TableLoader tableLoader = new TableLoader(jobConf, table, inputPath, tablePath); + TableLoader tableLoader = new TableLoader( + jobConf, table.getPhysicalName(), inputPath, tablePath); runningJobs.add(executor.submit(tableLoader)); } } finally { @@ -392,20 +395,56 @@ public class CsvBulkLoadTool extends Configured implements Tool { } /** - * Get names of index tables of current data table + * Get the index tables of current data table * @throws java.sql.SQLException */ - private List<String> getIndexTables(Connection conn, String schemaName, String tableName) + private List<TargetTableRef> getIndexTables(Connection conn, String schemaName, String qualifiedTableName) throws SQLException { - PTable table = PhoenixRuntime.getTable(conn, tableName); - List<String> indexTables = new ArrayList<String>(); + PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName); + List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>(); for(PTable indexTable : table.getIndexes()){ - indexTables.add(getQualifiedTableName(schemaName, - indexTable.getTableName().getString())); + if (indexTable.getIndexType() == IndexType.LOCAL) { + indexTables.add( + new TargetTableRef(getQualifiedTableName(schemaName, + indexTable.getTableName().getString()), + MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); + } else { + indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName, + indexTable.getTableName().getString()))); + } } return indexTables; } - + + /** + * Represents the logical and physical name of a single table to which data is to be loaded. + * + * This class exists to allow for the difference between HBase physical table names and + * Phoenix logical table names. + */ + private static class TargetTableRef { + + private final String logicalName; + private final String physicalName; + + private TargetTableRef(String name) { + this(name, name); + } + + private TargetTableRef(String logicalName, String physicalName) { + this.logicalName = logicalName; + this.physicalName = physicalName; + } + + public String getLogicalName() { + return logicalName; + } + + public String getPhysicalName() { + return physicalName; + } + } + /** * A runnable to load data into a single table * @@ -445,9 +484,9 @@ public class CsvBulkLoadTool extends Configured implements Tool { // initialize credentials to possibily run in a secure env TableMapReduceUtil.initCredentials(job); - - HTable htable = new HTable(conf, tableName); - + + HTable htable = new HTable(conf, tableName); + // Auto configure partitioner and reducer according to the Main Data table HFileOutputFormat.configureIncrementalLoad(job, htable); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d6e7846f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java index ead5067..6ff7ba3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java @@ -36,6 +36,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; + import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; @@ -165,7 +166,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes = PhoenixRuntime.getUncommittedDataIterator(conn, true); while (uncommittedDataIterator.hasNext()) { Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next(); - if(Bytes.compareTo(tableName, kvPair.getFirst()) != 0) { + if (Bytes.compareTo(tableName, kvPair.getFirst()) != 0) { // skip edits for other tables continue; }