TAJO-803: INSERT INTO without FROM throws ClassCastException. (Hyoungjun Kim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8e800469 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8e800469 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8e800469 Branch: refs/heads/window_function Commit: 8e8004698f4bd891a415fcc62add59998ced2ae3 Parents: 9bbf87e Author: Hyunsik Choi <[email protected]> Authored: Mon May 12 16:31:03 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Mon May 12 16:31:03 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../main/java/org/apache/tajo/cli/TajoCli.java | 12 +- .../java/org/apache/tajo/client/TajoClient.java | 11 +- .../tajo/engine/planner/LogicalPlanner.java | 39 ++--- .../engine/planner/physical/EvalExprExec.java | 22 ++- .../engine/planner/physical/StoreTableExec.java | 7 +- .../org/apache/tajo/master/GlobalEngine.java | 147 ++++++++++++++++--- .../apache/tajo/master/querymaster/Query.java | 10 +- .../master/querymaster/QueryMasterTask.java | 91 +++++++----- .../apache/tajo/worker/TaskAttemptContext.java | 18 ++- .../tajo/engine/query/TestInsertQuery.java | 94 ++++++++++++ 11 files changed, 351 insertions(+), 103 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 17af7bc..47884b8 100644 --- a/CHANGES +++ b/CHANGES @@ -37,6 +37,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-803: INSERT INTO without FROM throws ClassCastException. + (Hyoungjun Kim via hyunsik) + TAJO-813: CLI should support comment character with multi-line query. (Hyoungjun Kim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java index c7368c9..e0ca62a 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java @@ -404,10 +404,18 @@ public class TajoCli { private void localQueryCompleted(ClientProtos.SubmitQueryResponse response, long startTime) { ResultSet res = null; try { - res = TajoClient.createResultSet(client, response); + QueryId queryId = new QueryId(response.getQueryId()); float responseTime = ((float)(System.currentTimeMillis() - startTime) / 1000.0f); TableDesc desc = new TableDesc(response.getTableDesc()); - outputFormatter.printResult(sout, sin, desc, responseTime, res); + + // non-forwarded INSERT INTO query does not have any query id. + // In this case, it just returns succeeded query information without printing the query results. + if (response.getMaxRowNum() < 0 && queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { + outputFormatter.printResult(sout, sin, desc, responseTime, res); + } else { + res = TajoClient.createResultSet(client, response); + outputFormatter.printResult(sout, sin, desc, responseTime, res); + } } catch (Throwable t) { outputFormatter.printErrorMessage(sout, t); } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java index 9e45bf0..7d84592 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java @@ -354,10 +354,15 @@ public class TajoClient implements Closeable { return this.getQueryResultAndWait(queryId); } } else { - if (response.hasResultSet() || response.hasTableDesc()) { - return createResultSet(this, response); - } else { + // If a non-forwarded insert into query + if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() < 0) { return this.createNullResultSet(queryId); + } else { + if (response.hasResultSet() || response.hasTableDesc()) { + return createResultSet(this, response); + } else { + return this.createNullResultSet(queryId); + } } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java index 52c9782..d5d2d47 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java @@ -1244,25 +1244,30 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex Schema tableSchema = insertNode.getTableSchema(); Schema targetColumns = insertNode.getTargetSchema(); - ProjectionNode projectionNode = insertNode.getChild(); - - // Modifying projected columns by adding NULL constants - // It is because that table appender does not support target columns to be written. - List<Target> targets = TUtil.newList(); - for (int i = 0, j = 0; i < tableSchema.size(); i++) { - Column column = tableSchema.getColumn(i); - - if(targetColumns.contains(column) && j < projectionNode.getTargets().length) { - targets.add(projectionNode.getTargets()[j++]); - } else { - targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName())); + LogicalNode child = insertNode.getChild(); + if (child instanceof Projectable) { + Projectable projectionNode = (Projectable) insertNode.getChild(); + + // Modifying projected columns by adding NULL constants + // It is because that table appender does not support target columns to be written. + List<Target> targets = TUtil.newList(); + for (int i = 0, j = 0; i < tableSchema.size(); i++) { + Column column = tableSchema.getColumn(i); + + if(targetColumns.contains(column) && j < projectionNode.getTargets().length) { + targets.add(projectionNode.getTargets()[j++]); + } else { + targets.add(new Target(new ConstEval(NullDatum.get()), column.getSimpleName())); + } } - } - projectionNode.setTargets(targets.toArray(new Target[targets.size()])); + projectionNode.setTargets(targets.toArray(new Target[targets.size()])); - insertNode.setInSchema(projectionNode.getOutSchema()); - insertNode.setOutSchema(projectionNode.getOutSchema()); - insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets)); + insertNode.setInSchema(projectionNode.getOutSchema()); + insertNode.setOutSchema(projectionNode.getOutSchema()); + insertNode.setProjectedSchema(PlannerUtil.targetToSchema(targets)); + } else { + throw new RuntimeException("Wrong child node type: " + child.getType() + " for insert"); + } } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java index a843bce..b1ab7c4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java @@ -29,6 +29,7 @@ import java.io.IOException; public class EvalExprExec extends PhysicalExec { private final EvalExprNode plan; private float progress; + private boolean executedOnce = false; public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) { super(context, plan.getInSchema(), plan.getOutSchema()); @@ -41,17 +42,24 @@ public class EvalExprExec extends PhysicalExec { } @Override - public Tuple next() throws IOException { - Target [] targets = plan.getTargets(); - Tuple t = new VTuple(targets.length); - for (int i = 0; i < targets.length; i++) { - t.put(i, targets[i].getEvalTree().eval(inSchema, null)); + public Tuple next() throws IOException { + if (!executedOnce) { + Target [] targets = plan.getTargets(); + Tuple t = new VTuple(targets.length); + for (int i = 0; i < targets.length; i++) { + t.put(i, targets[i].getEvalTree().eval(inSchema, null)); + } + + executedOnce = true; + return t; + } else { + return null; } - return t; } @Override - public void rescan() throws IOException { + public void rescan() throws IOException { + executedOnce = false; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 1f927a6..b0c3c31 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -16,9 +16,6 @@ * limitations under the License. */ -/** - * - */ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.CatalogUtil; @@ -93,7 +90,9 @@ public class StoreTableExec extends UnaryPhysicalExec { appender.close(); // Collect statistics data context.setResultStats(appender.getStats()); - context.addShuffleFileOutput(0, context.getTaskId().toString()); + if (context.getTaskId() != null) { + context.addShuffleFileOutput(0, context.getTaskId().toString()); + } } appender = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 6caf031..3b81ce2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -22,12 +22,14 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoConstants; import org.apache.tajo.algebra.AlterTablespaceSetType; import org.apache.tajo.algebra.Expr; import org.apache.tajo.algebra.JsonHelper; @@ -35,6 +37,7 @@ import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.exception.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; @@ -46,13 +49,18 @@ import org.apache.tajo.engine.parser.HiveQLAnalyzer; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.*; import org.apache.tajo.engine.planner.logical.*; +import org.apache.tajo.engine.planner.physical.EvalExprExec; +import org.apache.tajo.engine.planner.physical.StoreTableExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.master.TajoMaster.MasterContext; +import org.apache.tajo.master.querymaster.Query; import org.apache.tajo.master.querymaster.QueryInfo; import org.apache.tajo.master.querymaster.QueryJobManager; +import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.session.Session; import org.apache.tajo.storage.*; +import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; import java.sql.SQLException; @@ -61,7 +69,6 @@ import java.util.List; import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto; -import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand; import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet; @@ -242,25 +249,29 @@ public class GlobalEngine extends AbstractService { if (targets == null) { throw new PlanningException("No targets"); } - Tuple outTuple = new VTuple(targets.length); + final Tuple outTuple = new VTuple(targets.length); for (int i = 0; i < targets.length; i++) { EvalNode eval = targets[i].getEvalTree(); outTuple.put(i, eval.eval(null, null)); } - - Schema schema = PlannerUtil.targetToSchema(targets); - RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); - byte [] serializedBytes = encoder.toBytes(outTuple); - SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder(); - serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); - serializedResBuilder.setSchema(schema.getProto()); - serializedResBuilder.setBytesNum(serializedBytes.length); - - responseBuilder.setResultSet(serializedResBuilder); - responseBuilder.setMaxRowNum(1); - responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); - responseBuilder.setResultCode(ClientProtos.ResultCode.OK); - + boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT; + if (isInsert) { + InsertNode insertNode = rootNode.getChild(); + insertNonFromQuery(insertNode, responseBuilder); + } else { + Schema schema = PlannerUtil.targetToSchema(targets); + RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema); + byte[] serializedBytes = encoder.toBytes(outTuple); + SerializedResultSet.Builder serializedResBuilder = SerializedResultSet.newBuilder(); + serializedResBuilder.addSerializedTuples(ByteString.copyFrom(serializedBytes)); + serializedResBuilder.setSchema(schema.getProto()); + serializedResBuilder.setBytesNum(serializedBytes.length); + + responseBuilder.setResultSet(serializedResBuilder); + responseBuilder.setMaxRowNum(1); + responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + } } else { // it requires distributed execution. So, the query is forwarded to a query master. context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); hookManager.doHooks(queryContext, plan); @@ -289,6 +300,107 @@ public class GlobalEngine extends AbstractService { return response; } + private void insertNonFromQuery(InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) + throws Exception { + String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName(); + String queryId = nodeUniqName + "_" + System.currentTimeMillis(); + + FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf()); + Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), fs, queryId.toString()); + + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + fs.mkdirs(stagingResultDir); + + TableDesc tableDesc = null; + Path finalOutputDir = null; + if (insertNode.getTableName() != null) { + tableDesc = this.catalog.getTableDesc(insertNode.getTableName()); + finalOutputDir = tableDesc.getPath(); + } else { + finalOutputDir = insertNode.getPath(); + } + + TaskAttemptContext taskAttemptContext = + new TaskAttemptContext(context.getConf(), null, (CatalogProtos.FragmentProto[]) null, stagingDir); + taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000")); + + EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild()); + StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec); + try { + exec.init(); + exec.next(); + } finally { + exec.close(); + } + + if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO + // it moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + try { + if (fs.exists(finalOutputDir)) { + fs.rename(finalOutputDir, oldTableDir); + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir.getParent()); + } + fs.rename(stagingResultDir, finalOutputDir); + committed = fs.exists(finalOutputDir); + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + fs.rename(oldTableDir, finalOutputDir); + } + } + } else { + FileStatus[] files = fs.listStatus(stagingResultDir); + for (FileStatus eachFile: files) { + Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName()); + if (fs.exists(targetFilePath)) { + targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis()); + } + fs.rename(eachFile.getPath(), targetFilePath); + } + } + + if (insertNode.hasTargetTable()) { + TableStats stats = tableDesc.getStats(); + long volume = Query.getTableVolume(context.getConf(), finalOutputDir); + stats.setNumBytes(volume); + stats.setNumRows(1); + + catalog.dropTable(insertNode.getTableName()); + catalog.createTable(tableDesc); + + responseBuilder.setTableDesc(tableDesc.getProto()); + } else { + TableStats stats = new TableStats(); + long volume = Query.getTableVolume(context.getConf(), finalOutputDir); + stats.setNumBytes(volume); + stats.setNumRows(1); + + // Empty TableDesc + List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>(); + CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder() + .setTableName(nodeUniqName) + .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType(CatalogProtos.StoreType.CSV).build()) + .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build()) + .setStats(stats.getProto()) + .build(); + + responseBuilder.setTableDesc(tableDescProto); + } + + // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows. + responseBuilder.setMaxRowNum(-1); + responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); + responseBuilder.setResultCode(ClientProtos.ResultCode.OK); + } + + public QueryId updateQuery(Session session, String sql, boolean isJson) throws IOException, SQLException, PlanningException { try { LOG.info("SQL: " + sql); @@ -395,7 +507,8 @@ public class GlobalEngine extends AbstractService { AlterTablespaceProto.Builder builder = AlterTablespaceProto.newBuilder(); builder.setSpaceName(spaceName); if (alterTablespace.getSetType() == AlterTablespaceSetType.LOCATION) { - AlterTablespaceCommand.Builder commandBuilder = AlterTablespaceCommand.newBuilder(); + AlterTablespaceProto.AlterTablespaceCommand.Builder commandBuilder = + AlterTablespaceProto.AlterTablespaceCommand.newBuilder(); commandBuilder.setType(AlterTablespaceProto.AlterTablespaceType.LOCATION); commandBuilder.setLocation(AlterTablespaceProto.SetLocation.newBuilder().setUri(alterTablespace.getLocation())); commandBuilder.build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 6a5248d..2848095 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -576,12 +576,12 @@ public class Query implements EventHandler<QueryEvent> { query.setResultDesc(finalTable); } } + } - private long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(systemConf); - ContentSummary directorySummary = fs.getContentSummary(tablePath); - return directorySummary.getLength(); - } + public static long getTableVolume(TajoConf systemConf, Path tablePath) throws IOException { + FileSystem fs = tablePath.getFileSystem(systemConf); + ContentSummary directorySummary = fs.getContentSummary(tablePath); + return directorySummary.getLength(); } public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> { http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 35d2cff..f812715 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -364,54 +364,19 @@ public class QueryMasterTask extends CompositeService { } } - /** - * It initializes the final output and staging directory and sets - * them to variables. - */ private void initStagingDir() throws IOException { - - String realUser; - String currentUser; - UserGroupInformation ugi; - ugi = UserGroupInformation.getLoginUser(); - realUser = ugi.getShortUserName(); - currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); - FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf); - Path stagingDir = null; Path outputDir = null; - try { - //////////////////////////////////////////// - // Create Output Directory - //////////////////////////////////////////// - - stagingDir = new Path(TajoConf.getStagingDir(systemConf), queryId.toString()); + FileSystem defaultFS = TajoConf.getWarehouseDir(systemConf).getFileSystem(systemConf); - if (defaultFS.exists(stagingDir)) { - throw new IOException("The staging directory '" + stagingDir + "' already exists"); - } - defaultFS.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); - FileStatus fsStatus = defaultFS.getFileStatus(stagingDir); - String owner = fsStatus.getOwner(); - - if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { - throw new IOException("The ownership on the user's query " + - "directory " + stagingDir + " is not as expected. " + - "It is owned by " + owner + ". The directory must " + - "be owned by the submitter " + currentUser + " or " + - "by " + realUser); - } + try { - if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { - LOG.info("Permissions on staging directory " + stagingDir + " are " + - "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + - "to correct value " + STAGING_DIR_PERMISSION); - defaultFS.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); - } + stagingDir = initStagingDir(systemConf, defaultFS, queryId.toString()); + defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME)); // Create a subdirectories - defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME)); LOG.info("The staging dir '" + stagingDir + "' is created."); + queryContext.setStagingDir(stagingDir); ///////////////////////////////////////////////// @@ -435,6 +400,52 @@ public class QueryMasterTask extends CompositeService { } } + /** + * It initializes the final output and staging directory and sets + * them to variables. + */ + public static Path initStagingDir(TajoConf conf, FileSystem fs, String queryId) throws IOException { + + String realUser; + String currentUser; + UserGroupInformation ugi; + ugi = UserGroupInformation.getLoginUser(); + realUser = ugi.getShortUserName(); + currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + + Path stagingDir = null; + + //////////////////////////////////////////// + // Create Output Directory + //////////////////////////////////////////// + + stagingDir = new Path(TajoConf.getStagingDir(conf), queryId); + + if (fs.exists(stagingDir)) { + throw new IOException("The staging directory '" + stagingDir + "' already exists"); + } + fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + FileStatus fsStatus = fs.getFileStatus(stagingDir); + String owner = fsStatus.getOwner(); + + if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) { + throw new IOException("The ownership on the user's query " + + "directory " + stagingDir + " is not as expected. " + + "It is owned by " + owner + ". The directory must " + + "be owned by the submitter " + currentUser + " or " + + "by " + realUser); + } + + if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) { + LOG.info("Permissions on staging directory " + stagingDir + " are " + + "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + + "to correct value " + STAGING_DIR_PERMISSION); + fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION)); + } + + return stagingDir; + } + public Query getQuery() { return query; } http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 6f3281c..f42df1d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -74,14 +74,16 @@ public class TaskAttemptContext { final Path workDir) { this.conf = conf; this.queryId = queryId; - - for(FragmentProto t : fragments) { - if (fragmentMap.containsKey(t.getId())) { - fragmentMap.get(t.getId()).add(t); - } else { - List<FragmentProto> frags = new ArrayList<FragmentProto>(); - frags.add(t); - fragmentMap.put(t.getId(), frags); + + if (fragments != null) { + for (FragmentProto t : fragments) { + if (fragmentMap.containsKey(t.getId())) { + fragmentMap.get(t.getId()).add(t); + } else { + List<FragmentProto> frags = new ArrayList<FragmentProto>(); + frags.add(t); + fragmentMap.put(t.getId(), frags); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8e800469/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java index 06ce973..8453488 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java @@ -32,6 +32,8 @@ import org.apache.tajo.catalog.TableDesc; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.BufferedReader; +import java.io.InputStreamReader; import java.sql.ResultSet; import static org.junit.Assert.*; @@ -289,4 +291,96 @@ public class TestInsertQuery extends QueryTestCaseBase { assertEquals(5, desc.getStats().getNumRows().intValue()); } } + + @Test + public final void testInsertOverwriteTableWithNonFromQuery() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery"); + ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)"); + res.close(); + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), tableName)); + + res = executeString("insert overwrite into " + tableName + + " select 1::INT4, 2.1::FLOAT4, 'test'; "); + + res.close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName); + if (!testingCluster.isHCatalogStoreRunning()) { + assertEquals(1, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + tableName + ";"); + assertTrue(res.next()); + + assertEquals(3, res.getMetaData().getColumnCount()); + assertEquals(1, res.getInt(1)); + assertEquals(2.1f, res.getFloat(2), 10); + assertEquals("test", res.getString(3)); + + res.close(); + } + + @Test + public final void testInsertOverwriteTableWithNonFromQuery2() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("InsertOverwriteWithEvalQuery"); + ResultSet res = executeString("create table " + tableName +" (col1 int4, col2 float4, col3 text)"); + res.close(); + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(getCurrentDatabase(), tableName)); + + res = executeString("insert overwrite into " + tableName + " (col1, col3) select 1::INT4, 'test';"); + res.close(); + + TableDesc desc = catalog.getTableDesc(getCurrentDatabase(), tableName); + if (!testingCluster.isHCatalogStoreRunning()) { + assertEquals(1, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + tableName + ";"); + assertTrue(res.next()); + + assertEquals(3, res.getMetaData().getColumnCount()); + assertEquals(1, res.getInt(1)); + assertEquals("", res.getString(2)); + assertEquals("test", res.getString(3)); + + res.close(); + } + + @Test + public final void testInsertOverwritePathWithNonFromQuery() throws Exception { + ResultSet res = executeString("insert overwrite into location " + + "'/tajo-data/testInsertOverwritePathWithNonFromQuery' " + + "USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " + + "select 1::INT4, 2.1::FLOAT4, 'test'"); + + res.close(); + FileSystem fs = FileSystem.get(testingCluster.getConfiguration()); + Path path = new Path("/tajo-data/testInsertOverwritePathWithNonFromQuery"); + assertTrue(fs.exists(path)); + assertEquals(1, fs.listStatus(path).length); + + CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration()); + FileStatus file = fs.listStatus(path)[0]; + CompressionCodec codec = factory.getCodec(file.getPath()); + assertTrue(codec instanceof DeflateCodec); + + BufferedReader reader = new BufferedReader( + new InputStreamReader(codec.createInputStream(fs.open(file.getPath())))); + + try { + String line = reader.readLine(); + assertNotNull(line); + + String[] tokens = line.split("\\|"); + + assertEquals(3, tokens.length); + assertEquals("1", tokens[0]); + assertEquals("2.1", tokens[1]); + assertEquals("test", tokens[2]); + } finally { + reader.close(); + } + } }
