Repository: tajo Updated Branches: refs/heads/master 1fc670cf9 -> b8b7066a2
TAJO-2048: QueryMaster and TajoWorker should support the exception propagation. Closes #936 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b8b7066a Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b8b7066a Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b8b7066a Branch: refs/heads/master Commit: b8b7066a23d53dbf4456e15a0a7dd18162cb8254 Parents: 1fc670c Author: Jinho Kim <[email protected]> Authored: Fri Jan 22 16:04:20 2016 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Jan 22 16:04:20 2016 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../org/apache/tajo/exception/ErrorUtil.java | 25 ++++++++++++- .../apache/tajo/exception/ReturnStateUtil.java | 14 ++++++- .../tajo/cli/tsql/TestTajoCliNegatives.java | 2 +- .../apache/tajo/worker/MockExecutionBlock.java | 2 +- .../physical/RangeShuffleFileWriteExec.java | 3 +- .../java/org/apache/tajo/master/QueryInfo.java | 2 + .../org/apache/tajo/master/QueryManager.java | 5 ++- .../tajo/master/TajoMasterClientService.java | 12 +++--- .../tajo/master/event/StageTaskFailedEvent.java | 39 ++++++++++++++++++++ .../tajo/master/event/TaskFatalErrorEvent.java | 17 +++++---- .../master/event/TaskTAttemptFailedEvent.java | 36 ++++++++++++++++++ .../tajo/querymaster/DefaultTaskScheduler.java | 6 +-- .../java/org/apache/tajo/querymaster/Query.java | 21 ++++++++--- .../apache/tajo/querymaster/QueryMaster.java | 8 +++- .../tajo/querymaster/QueryMasterTask.java | 5 ++- .../java/org/apache/tajo/querymaster/Stage.java | 32 ++++++++++++---- .../java/org/apache/tajo/querymaster/Task.java | 17 ++++----- .../apache/tajo/querymaster/TaskAttempt.java | 8 ++-- .../tajo/worker/ExecutionBlockContext.java | 11 +++--- .../org/apache/tajo/worker/TaskContainer.java | 2 +- .../org/apache/tajo/worker/TaskExecutor.java | 3 +- .../java/org/apache/tajo/worker/TaskImpl.java | 16 ++------ tajo-core/src/main/proto/ResourceProtos.proto | 7 ++-- 24 files changed, 219 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 01634c3..0e7978e 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,9 @@ Release 0.12.0 - unreleased IMPROVEMENT + TAJO-2048: QueryMaster and TajoWorker should support the exception + propagation. (jinho) + TAJO-2050: Adopt TAJO logo in CLI. (Dongkyu Hwangbo via jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java index 025a20c..9a71bd6 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java @@ -18,6 +18,8 @@ package org.apache.tajo.exception; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tajo.error.Errors; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.error.Stacktrace; @@ -32,13 +34,32 @@ public class ErrorUtil { public static Stacktrace.StackTrace convertStacktrace(Throwable t) { Stacktrace.StackTrace.Builder builder = Stacktrace.StackTrace.newBuilder(); - for (StackTraceElement element: t.getStackTrace()) { + for (StackTraceElement element : t.getStackTrace()) { builder.addElement(Stacktrace.StackTrace.Element.newBuilder() - .setFilename(element.getFileName()) + .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName()) .setFunction(element.getClassName() + "::" + element.getMethodName()) .setLine(element.getLineNumber()) ); } return builder.build(); } + + public static Errors.SerializedException convertException(Throwable t) { + Errors.SerializedException.Builder builder = Errors.SerializedException.newBuilder(); + + if (ExceptionUtil.isExceptionWithResultCode(t)) { + DefaultTajoException tajoException = (DefaultTajoException) t; + builder.setReturnCode(tajoException.getErrorCode()); + builder.setMessage(tajoException.getMessage()); + } else { + Throwable rootCause = ExceptionUtils.getRootCause(t); + if(rootCause != null) t = rootCause; + + builder.setReturnCode(ResultCode.INTERNAL_ERROR); + builder.setMessage(ErrorMessages.getInternalErrorMessage(t)); + } + builder.setStackTrace(ErrorUtil.convertStacktrace(t)); + builder.setTimestamp(System.currentTimeMillis()); + return builder.build(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java index 3257f46..152442d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java @@ -20,6 +20,7 @@ package org.apache.tajo.exception; import com.google.common.base.Preconditions; import org.apache.tajo.QueryId; +import org.apache.tajo.error.Errors; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListResponse; @@ -83,9 +84,20 @@ public class ReturnStateUtil { } else { builder.setReturnCode(ResultCode.INTERNAL_ERROR); builder.setMessage(ErrorMessages.getInternalErrorMessage(t)); - builder.setStackTrace(ErrorUtil.convertStacktrace(t)); } + builder.setStackTrace(ErrorUtil.convertStacktrace(t)); + return builder.build(); + } + + public static ReturnState returnError(Errors.SerializedException e) { + ReturnState.Builder builder = ReturnState.newBuilder(); + + builder.setReturnCode(e.getReturnCode()); + builder.setMessage(e.getMessage()); + if (e.hasStackTrace()) { + builder.setStackTrace(e.getStackTrace()); + } return builder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java index 6d939de..fcf4546 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java @@ -141,6 +141,6 @@ public class TestTajoCliNegatives extends QueryTestCaseBase { public void testQueryFailure() throws Exception { setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); assertScriptFailure("select fail(3, l_orderkey, 'testQueryFailure') from default.lineitem where l_orderkey > 0" , - "ERROR: Internal error. Please check out log files in ${tajo_install_dir}/logs directory.\n"); + "ERROR: internal error: testQueryFailure\n"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java index 7d7fb1a..cbc4312 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java @@ -36,7 +36,7 @@ public class MockExecutionBlock extends ExecutionBlockContext { } @Override - public void fatalError(TaskAttemptId taskAttemptId, String message) { + public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) { } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index e4217b3..776a783 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -28,7 +28,6 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.plan.logical.ShuffleFileWriteNode; import org.apache.tajo.plan.util.PlannerUtil; @@ -74,7 +73,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); keyProjector = new KeyProjector(inSchema, keySchema.toArray()); - BSTIndex bst = new BSTIndex(new TajoConf()); + BSTIndex bst = new BSTIndex(context.getConf()); this.comp = new BaseTupleComparator(keySchema, sortSpecs); Path storeTablePath = new Path(context.getWorkDir(), "output"); LOG.info("Output data directory: " + storeTablePath); http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java index 6c324d9..eba633e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java @@ -44,6 +44,8 @@ public class QueryInfo implements GsonObject, History, Comparable<QueryInfo> { private volatile long startTime; @Expose private volatile long finishTime; + + @Deprecated @Expose private String lastMessage; @Expose http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 24ed830..b4f1d66 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -346,7 +346,10 @@ public class QueryManager extends CompositeService { queryInfo.setQueryMaster(connectionInfo.getHost()); queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort()); queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort()); - queryInfo.setLastMessage(queryHeartbeat.getStatusMessage()); + if(queryHeartbeat.hasError()) { + //TODO set error instead of last message + queryInfo.setLastMessage(queryHeartbeat.getError().getMessage()); + } queryInfo.setQueryState(queryHeartbeat.getState()); queryInfo.setProgress(queryHeartbeat.getQueryProgress()); http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index bb04229..bfba51d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -39,10 +39,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.QueryNotFoundException; -import org.apache.tajo.exception.ReturnStateUtil; -import org.apache.tajo.exception.UnavailableTableLocationException; -import org.apache.tajo.exception.UndefinedDatabaseException; +import org.apache.tajo.exception.*; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; @@ -61,6 +58,7 @@ import org.apache.tajo.session.Session; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.StringUtils; import java.net.InetSocketAddress; import java.util.*; @@ -109,7 +107,7 @@ public class TajoMasterClientService extends AbstractService { @Override public void serviceStop() throws Exception { if (server != null) { - server.shutdown(); + server.shutdown(true); } super.serviceStop(); } @@ -505,6 +503,10 @@ public class TajoMasterClientService extends AbstractService { builder.setFinishTime(queryInfo.getFinishTime()); } else { builder.setFinishTime(System.currentTimeMillis()); + + if(!StringUtils.isEmpty(queryInfo.getLastMessage())) { + builder.setErrorMessage(queryInfo.getLastMessage()); + } } } else { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java new file mode 100644 index 0000000..731134e --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.TaskId; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.master.TaskState; + +/** + * Event Class: From Task to Stage + */ +public class StageTaskFailedEvent extends StageTaskEvent { + private final SerializedException exception; + + public StageTaskFailedEvent(TaskId taskId, SerializedException exception) { + super(taskId, TaskState.FAILED); + this.exception = exception; + } + + public SerializedException getException() { + return exception; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java index d50fcb8..351a42b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java @@ -20,22 +20,23 @@ package org.apache.tajo.master.event; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.ResourceProtos.TaskFatalErrorReport; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; public class TaskFatalErrorEvent extends TaskAttemptEvent { - private final String message; + private final SerializedException error; public TaskFatalErrorEvent(TaskFatalErrorReport report) { - super(new TaskAttemptId(report.getId()), - TaskAttemptEventType.TA_FATAL_ERROR); - this.message = report.getErrorMessage(); + super(new TaskAttemptId(report.getId()), TaskAttemptEventType.TA_FATAL_ERROR); + this.error = report.getError(); } - public TaskFatalErrorEvent(TaskAttemptId attemptId, String message) { + public TaskFatalErrorEvent(TaskAttemptId attemptId, Throwable e) { super(attemptId, TaskAttemptEventType.TA_FATAL_ERROR); - this.message = message; + this.error = ErrorUtil.convertException(e); } - public String errorMessage() { - return message; + public SerializedException getError() { + return error; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java new file mode 100644 index 0000000..77e0c4a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.error.Errors.SerializedException; + +public class TaskTAttemptFailedEvent extends TaskTAttemptEvent { + private final SerializedException exception; + + public TaskTAttemptFailedEvent(TaskAttemptId attemptId, + SerializedException exception) { + super(attemptId, TaskEventType.T_ATTEMPT_FAILED); + this.exception = exception; + } + + public SerializedException getException() { + return exception; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 8535912..be30af2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -97,7 +97,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { @Override public void init(Configuration conf) { tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); - rpcParams = RpcParameterFactory.get(new TajoConf()); + rpcParams = RpcParameterFactory.get(tajoConf); scheduledRequests = new ScheduledRequests(); minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); @@ -116,11 +116,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { break; } else { LOG.fatal(e.getMessage(), e); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, e); } } catch (Throwable e) { LOG.fatal(e.getMessage(), e); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, e); break; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 82907ca..0cd178f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -44,6 +44,8 @@ import org.apache.tajo.engine.planner.global.ExecutionBlockCursor; import org.apache.tajo.engine.planner.global.ExecutionQueue; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.master.event.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; @@ -79,11 +81,12 @@ public class Query implements EventHandler<QueryEvent> { private long startTime; private long finishTime; private TableDesc resultDesc; - private int completedStagesCount = 0; - private int succeededStagesCount = 0; - private int killedStagesCount = 0; - private int failedStagesCount = 0; - private int erroredStagesCount = 0; + private volatile int completedStagesCount = 0; + private volatile int succeededStagesCount = 0; + private volatile int killedStagesCount = 0; + private volatile int failedStagesCount = 0; + private volatile int erroredStagesCount = 0; + private volatile SerializedException failureReason; private final List<String> diagnostics = new ArrayList<>(); // Internal Variables @@ -341,6 +344,10 @@ public class Query implements EventHandler<QueryEvent> { } } + public SerializedException getFailureReason() { + return failureReason; + } + public List<String> getDiagnostics() { readLock.lock(); try { @@ -530,6 +537,8 @@ public class Query implements EventHandler<QueryEvent> { query.clearPartitions(); } } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + query.failureReason = ErrorUtil.convertException(e); query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); return QueryState.QUERY_ERROR; } @@ -791,8 +800,10 @@ public class Query implements EventHandler<QueryEvent> { query.killedStagesCount++; } else if (castEvent.getState() == StageState.FAILED) { query.failedStagesCount++; + query.failureReason = query.getStage(castEvent.getExecutionBlockId()).getFailureReason(); } else if (castEvent.getState() == StageState.ERROR) { query.erroredStagesCount++; + query.failureReason = query.getStage(castEvent.getExecutionBlockId()).getFailureReason(); } else { LOG.error(String.format("Invalid Stage (%s) State %s at %s", castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name())); http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 7104fb9..adc7b08 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -37,7 +37,7 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.ReturnStateUtil; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.master.event.QueryStartEvent; @@ -351,9 +351,13 @@ public class QueryMaster extends CompositeService implements EventHandler { builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto()); } builder.setQueryProgress(queryMasterTask.getQuery().getProgress()); + + if(queryMasterTask.getQuery().getFailureReason() != null) { + builder.setError(queryMasterTask.getQuery().getFailureReason()); + } } if (queryMasterTask.isInitError()) { - builder.setStatusMessage(ReturnStateUtil.returnError(queryMasterTask.getInitError()).getMessage()); + builder.setError(ErrorUtil.convertException(queryMasterTask.getInitError())); } return builder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 995a8e5..6030d90 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -37,6 +37,7 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; @@ -266,12 +267,12 @@ public class QueryMasterTask extends CompositeService { if(!callFuture.get().getValue()){ getEventHandler().handle( - new TaskFatalErrorEvent(taskAttemptId, "Can't kill task :" + taskAttemptId)); + new TaskFatalErrorEvent(taskAttemptId, new TajoInternalError("Can't kill task :" + taskAttemptId))); } } catch (Exception e) { /* Node RPC failure */ LOG.error(e.getMessage(), e); - getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); + getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 5f050bf..85086e6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -42,7 +42,10 @@ import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.event.*; @@ -63,6 +66,7 @@ import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.RpcParameterFactory; +import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.util.history.TaskHistory; @@ -289,6 +293,7 @@ public class Stage implements EventHandler<StageEvent> { private volatile int succeededObjectCount = 0; private volatile int killedObjectCount = 0; private volatile int failedObjectCount = 0; + private volatile SerializedException failureReason; private TaskSchedulerContext schedulerContext; private List<IntermediateEntry> hashShuffleIntermediateEntries = Lists.newArrayList(); private AtomicInteger completedShuffleTasks = new AtomicInteger(0); @@ -412,6 +417,10 @@ public class Stage implements EventHandler<StageEvent> { return completedTaskCount; } + public SerializedException getFailureReason() { + return failureReason; + } + public ExecutionBlock getBlock() { return block; } @@ -533,18 +542,26 @@ public class Stage implements EventHandler<StageEvent> { eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED)); } + public void abort(StageState finalState) { + abort(finalState, null); + } + /** * It finalizes this stage. Unlike {@link Stage#complete()}, * it is invoked when a stage is abnormally finished. * * @param finalState The final stage state + * @param reason The failure reason, if exist */ - public void abort(StageState finalState) { + public void abort(StageState finalState, Throwable reason) { // TODO - // - committer.abortStage(...) // - record Stage Finish Time // - CleanUp Tasks // - Record History + if(reason != null) + failureReason = ErrorUtil.convertException(reason); + cleanup(); setFinishTime(); eventHandler.handle(new StageCompletedEvent(getId(), finalState)); @@ -1229,7 +1246,9 @@ public class Stage implements EventHandler<StageEvent> { } else if (task.getState() == TaskState.KILLED) { stage.killedObjectCount++; } else if (task.getState() == TaskState.FAILED) { + StageTaskFailedEvent failedEvent = TUtil.checkTypeAndGet(event, StageTaskFailedEvent.class); stage.failedObjectCount++; + stage.failureReason = failedEvent.getException(); // if at least one task is failed, try to kill all tasks. stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); } @@ -1407,17 +1426,14 @@ public class Stage implements EventHandler<StageEvent> { stage.getSucceededObjectCount(), stage.killedObjectCount)); - if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) { + // If the current stage are failed, next stages receives SQ_KILL event + if (stage.killedObjectCount + stage.failedObjectCount > 0) { if (stage.failedObjectCount > 0) { stage.abort(StageState.FAILED); return StageState.FAILED; - } else if (stage.killedObjectCount > 0) { + } else { stage.abort(StageState.KILLED); return StageState.KILLED; - } else { - LOG.error("Invalid State " + stage.getSynchronizedState() + " State"); - stage.abort(StageState.ERROR); - return StageState.ERROR; } } else { stage.complete(); @@ -1425,7 +1441,7 @@ public class Stage implements EventHandler<StageEvent> { } } catch (Throwable t) { LOG.error(t.getMessage(), t); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, t); return StageState.ERROR; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index c56bb0e..466f6c9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -45,6 +45,7 @@ import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.history.TaskHistory; @@ -624,10 +625,8 @@ public class Task implements EventHandler<TaskEvent> { private static class AttemptFailedTransition implements SingleArcTransition<Task, TaskEvent> { @Override public void transition(Task task, TaskEvent event) { - if (!(event instanceof TaskTAttemptEvent)) { - throw new IllegalArgumentException("event should be a TaskTAttemptEvent type."); - } - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; + TaskTAttemptFailedEvent attemptEvent = TUtil.checkTypeAndGet(event, TaskTAttemptFailedEvent.class); + LOG.info("============================================================="); LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<"); LOG.info("============================================================="); @@ -635,7 +634,7 @@ public class Task implements EventHandler<TaskEvent> { task.finishedAttempts++; task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED)); + task.eventHandler.handle(new StageTaskFailedEvent(task.getId(), attemptEvent.getException())); } } @@ -644,10 +643,8 @@ public class Task implements EventHandler<TaskEvent> { @Override public TaskState transition(Task task, TaskEvent taskEvent) { - if (!(taskEvent instanceof TaskTAttemptEvent)) { - throw new IllegalArgumentException("taskEvent should be a TaskTAttemptEvent type."); - } - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent; + TaskTAttemptFailedEvent attemptEvent = TUtil.checkTypeAndGet(taskEvent, TaskTAttemptFailedEvent.class); + task.failedAttempts++; task.finishedAttempts++; boolean retry = task.failedAttempts < task.maxAttempts; @@ -663,7 +660,7 @@ public class Task implements EventHandler<TaskEvent> { } } else { task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED)); + task.eventHandler.handle(new StageTaskFailedEvent(task.getId(), attemptEvent.getException())); return TaskState.FAILED; } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index b5ffc03..ed3002a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -409,7 +409,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { taskAttempt.fillTaskStatistics(report); taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); } catch (Throwable t) { - taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage())); + taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t)); taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t)); } } @@ -432,10 +432,10 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> { throw new IllegalArgumentException("event should be a TaskFatalErrorEvent type."); } TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; - taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); - taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(taskAttempt.getId(), errorEvent.getError())); + taskAttempt.addDiagnosticInfo(errorEvent.getError().getMessage()); LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost() - + " >> " + errorEvent.errorMessage()); + + " >> " + errorEvent.getError().getMessage()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 098567a..e675d70 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -38,8 +38,9 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.ErrorUtil; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; @@ -308,13 +309,13 @@ public class ExecutionBlockContext { return taskHistories; } - public void fatalError(TaskAttemptId taskAttemptId, String message) { - if (message == null) { - message = "No error message"; + public void fatalError(TaskAttemptId taskAttemptId, Throwable error) { + if (error == null) { + error = new TajoInternalError("No error message"); } TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder() .setId(taskAttemptId.getProto()) - .setErrorMessage(message); + .setError(ErrorUtil.convertException(error)); try { //If QueryMaster does not responding, current execution block should be stop http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java index bd28bb7..ac37258 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java @@ -71,7 +71,7 @@ public class TaskContainer implements Runnable { if (task != null) { try { task.abort(); - task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e.getMessage()); + task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e); } catch (Throwable t) { LOG.fatal(t.getMessage(), t); } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java index 57f3cd9..7476580 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java @@ -31,6 +31,7 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.TaskRequestImpl; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.resource.NodeResource; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; @@ -157,7 +158,7 @@ public class TaskExecutor extends AbstractService implements EventHandler<TaskSt if (executionBlockContext.getTasks().containsKey(taskAttemptId)) { String errorMessage = "Duplicate Task Attempt: " + taskAttemptId; LOG.error(errorMessage); - executionBlockContext.fatalError(taskAttemptId, errorMessage); + executionBlockContext.fatalError(taskAttemptId, new TajoInternalError(errorMessage)); } else { task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext); executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), task); http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 6d9639c..55eb02a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -21,7 +21,6 @@ package org.apache.tajo.worker; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +40,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.function.python.TajoScriptEngine; @@ -448,18 +448,10 @@ public class TaskImpl implements Task { executionBlockContext.killedTasksNum.incrementAndGet(); } else { context.setState(TaskAttemptState.TA_FAILED); - TaskFatalErrorReport.Builder errorBuilder = - TaskFatalErrorReport.newBuilder() - .setId(getId().getProto()); - if (error != null) { - if (error.getMessage() == null) { - errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); - } else { - errorBuilder.setErrorMessage(error.getMessage()); - } - errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); - } + TaskFatalErrorReport.Builder errorBuilder = TaskFatalErrorReport.newBuilder(); + errorBuilder.setId(getId().getProto()); + errorBuilder.setError(ErrorUtil.convertException(error)); queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); executionBlockContext.failedTasksNum.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/proto/ResourceProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto index 3643a97..74a475e 100644 --- a/tajo-core/src/main/proto/ResourceProtos.proto +++ b/tajo-core/src/main/proto/ResourceProtos.proto @@ -26,6 +26,8 @@ import "TajoIdProtos.proto"; import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; import "Plan.proto"; +import "errors.proto"; +import "stacktrace.proto"; enum ResponseCommand { NORMAL = 1; // ping @@ -114,8 +116,7 @@ message TaskCompletionReport { message TaskFatalErrorReport { required TaskAttemptIdProto id = 1; - optional string error_message = 2; - optional string error_trace = 3; + required tajo.error.SerializedException error = 2; } message FailureIntermediateProto { @@ -215,7 +216,7 @@ message TajoHeartbeatRequest { optional QueryIdProto query_id = 2; optional QueryState state = 3; optional TableDescProto result_desc = 4; - optional string status_message = 5; + optional tajo.error.SerializedException error = 5; optional float query_progress = 6; }
