TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ca5fb301 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ca5fb301 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ca5fb301 Branch: refs/heads/block_iteration Commit: ca5fb301bff4b38d80a523d5bece9eaf74f64ec3 Parents: ca187bc Author: Jaehwa Jung <[email protected]> Authored: Mon Oct 6 14:12:42 2014 +0900 Committer: Jaehwa Jung <[email protected]> Committed: Mon Oct 6 14:12:42 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 2 +- .../apache/tajo/master/querymaster/Query.java | 135 +++++++++++++++++-- .../tajo/engine/query/TestTablePartitions.java | 53 ++++++-- 4 files changed, 170 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/ca5fb301/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index bcb4a7d..8f74a7f 100644 --- a/CHANGES +++ b/CHANGES @@ -158,6 +158,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa) + TAJO-1065: The \admin -cluster argument doesn't run as expected. (Jongyoung Park via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/ca5fb301/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index b5a9b50..f9f5e4a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -359,7 +359,7 @@ public class TajoConf extends Configuration { $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false), // ResultSet --------------------------------------------------------- - $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200) + $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200), ; public final String varname; http://git-wip-us.apache.org/repos/asf/tajo/blob/ca5fb301/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index c2cf54e..7899365 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -418,25 +418,75 @@ public class Query implements EventHandler<QueryEvent> { if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO - // it moves the original table into the temporary location. + // It moves the original table into the temporary location. // Then it moves the new result table into the original table location. // Upon failed, it recovers the original table if possible. boolean movedToOldTable = false; boolean committed = false; Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - try { - if (fs.exists(finalOutputDir)) { - fs.rename(finalOutputDir, oldTableDir); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir.getParent()); + + if (queryContext.hasPartition()) { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. + Map<Path, Path> renameDirs = TUtil.newHashMap(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. + Map<Path, Path> recoveryDirs = TUtil.newHashMap(); + + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } + + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); + + // Rename target partition directories + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.delete(entry.getValue(), true); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } + + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + fs.delete(entry.getValue(), true); + } + + // Recovery renamed dirs + for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { + fs.delete(entry.getValue(), true); + fs.rename(entry.getValue(), entry.getKey()); + } + throw new IOException(ioe.getMessage()); } - fs.rename(stagingResultDir, finalOutputDir); - committed = fs.exists(finalOutputDir); - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - fs.rename(oldTableDir, finalOutputDir); + } else { + try { + if (fs.exists(finalOutputDir)) { + fs.rename(finalOutputDir, oldTableDir); + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir.getParent()); + } + + fs.rename(stagingResultDir, finalOutputDir); + committed = fs.exists(finalOutputDir); + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + fs.rename(oldTableDir, finalOutputDir); + } } } } else { @@ -486,6 +536,65 @@ public class Query implements EventHandler<QueryEvent> { return finalOutputDir; } + /** + * This method sets a rename map which includes renamed staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!isLeafDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; + } + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { FileStatus[] files = fs.listStatus(stagingPath); if (files != null && files.length != 0) { http://git-wip-us.apache.org/repos/asf/tajo/blob/ca5fb301/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 5bf2944..da09129 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -25,10 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.DeflateCodec; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -47,6 +44,7 @@ import org.apache.tajo.master.querymaster.QueryUnit; import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TajoWorker; import org.junit.Test; @@ -485,6 +483,45 @@ public class TestTablePartitions extends QueryTestCaseBase { "R,3,3,49.0\n" + "R,3,3,49.0\n"; assertEquals(expected, resultSetData); + + // Check not to remove existing partition directories. + res = executeString("insert overwrite into " + tableName + + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem " + + " where l_orderkey = 1 and l_partkey = 1 and l_linenumber = 1"); + res.close(); + + desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + assertTrue(fs.isDirectory(path)); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0"))); + + if (!testingCluster.isHCatalogStoreRunning()) { + // TODO: If there is existing another partition directory, we must add its rows number to result row numbers. + // assertEquals(6, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + tableName + " where col2 = 1"); + resultSetData = resultSetToString(res); + res.close(); + expected = "col4,col1,col2,col3\n" + + "-------------------------------\n" + + "N,1,1,17.0\n" + + "N,1,1,17.0\n" + + "N,1,1,30.0\n" + + "N,1,1,36.0\n" + + "N,1,1,36.0\n"; + + assertEquals(expected, resultSetData); } @Test @@ -888,16 +925,16 @@ public class TestTablePartitions extends QueryTestCaseBase { executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl"); - executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY COLUMN (type text)") + executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY COLUMN (type text)") .close(); - executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") + executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") .close(); - ResultSet res = executeString("select * from pTable947 where type='RA:*?><I/L#%S'"); + ResultSet res = executeString("select * from pTable948 where type='RA:*?><I/L#%S'"); assertResultSet(res); cleanupQuery(res); - res = executeString("select * from pTable947 where type='RA:*?><I/L#%S' or type='AIR01'"); + res = executeString("select * from pTable948 where type='RA:*?><I/L#%S' or type='AIR01'"); assertResultSet(res); cleanupQuery(res); }
