Repository: tajo
Updated Branches:
  refs/heads/master 1fc670cf9 -> b8b7066a2


TAJO-2048: QueryMaster and TajoWorker should support the exception propagation.

Closes #936


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b8b7066a
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b8b7066a
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b8b7066a

Branch: refs/heads/master
Commit: b8b7066a23d53dbf4456e15a0a7dd18162cb8254
Parents: 1fc670c
Author: Jinho Kim <[email protected]>
Authored: Fri Jan 22 16:04:20 2016 +0900
Committer: Jinho Kim <[email protected]>
Committed: Fri Jan 22 16:04:20 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../org/apache/tajo/exception/ErrorUtil.java    | 25 ++++++++++++-
 .../apache/tajo/exception/ReturnStateUtil.java  | 14 ++++++-
 .../tajo/cli/tsql/TestTajoCliNegatives.java     |  2 +-
 .../apache/tajo/worker/MockExecutionBlock.java  |  2 +-
 .../physical/RangeShuffleFileWriteExec.java     |  3 +-
 .../java/org/apache/tajo/master/QueryInfo.java  |  2 +
 .../org/apache/tajo/master/QueryManager.java    |  5 ++-
 .../tajo/master/TajoMasterClientService.java    | 12 +++---
 .../tajo/master/event/StageTaskFailedEvent.java | 39 ++++++++++++++++++++
 .../tajo/master/event/TaskFatalErrorEvent.java  | 17 +++++----
 .../master/event/TaskTAttemptFailedEvent.java   | 36 ++++++++++++++++++
 .../tajo/querymaster/DefaultTaskScheduler.java  |  6 +--
 .../java/org/apache/tajo/querymaster/Query.java | 21 ++++++++---
 .../apache/tajo/querymaster/QueryMaster.java    |  8 +++-
 .../tajo/querymaster/QueryMasterTask.java       |  5 ++-
 .../java/org/apache/tajo/querymaster/Stage.java | 32 ++++++++++++----
 .../java/org/apache/tajo/querymaster/Task.java  | 17 ++++-----
 .../apache/tajo/querymaster/TaskAttempt.java    |  8 ++--
 .../tajo/worker/ExecutionBlockContext.java      | 11 +++---
 .../org/apache/tajo/worker/TaskContainer.java   |  2 +-
 .../org/apache/tajo/worker/TaskExecutor.java    |  3 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   | 16 ++------
 tajo-core/src/main/proto/ResourceProtos.proto   |  7 ++--
 24 files changed, 219 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 01634c3..0e7978e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,9 @@ Release 0.12.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-2048: QueryMaster and TajoWorker should support the exception
+    propagation. (jinho)
+
     TAJO-2050: Adopt TAJO logo in CLI.
     (Dongkyu Hwangbo via jaehwa)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java 
b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
index 025a20c..9a71bd6 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.exception;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tajo.error.Errors;
 import org.apache.tajo.error.Errors.ResultCode;
 import org.apache.tajo.error.Stacktrace;
 
@@ -32,13 +34,32 @@ public class ErrorUtil {
 
   public static Stacktrace.StackTrace convertStacktrace(Throwable t) {
     Stacktrace.StackTrace.Builder builder = Stacktrace.StackTrace.newBuilder();
-    for (StackTraceElement element: t.getStackTrace()) {
+    for (StackTraceElement element : t.getStackTrace()) {
       builder.addElement(Stacktrace.StackTrace.Element.newBuilder()
-              .setFilename(element.getFileName())
+              .setFilename(element.getFileName() == null ? "(Unknown Source)" 
: element.getFileName())
               .setFunction(element.getClassName() + "::" + 
element.getMethodName())
               .setLine(element.getLineNumber())
       );
     }
     return builder.build();
   }
+
+  public static Errors.SerializedException convertException(Throwable t) {
+    Errors.SerializedException.Builder builder = 
Errors.SerializedException.newBuilder();
+
+    if (ExceptionUtil.isExceptionWithResultCode(t)) {
+      DefaultTajoException tajoException = (DefaultTajoException) t;
+      builder.setReturnCode(tajoException.getErrorCode());
+      builder.setMessage(tajoException.getMessage());
+    } else {
+      Throwable rootCause = ExceptionUtils.getRootCause(t);
+      if(rootCause != null) t = rootCause;
+
+      builder.setReturnCode(ResultCode.INTERNAL_ERROR);
+      builder.setMessage(ErrorMessages.getInternalErrorMessage(t));
+    }
+    builder.setStackTrace(ErrorUtil.convertStacktrace(t));
+    builder.setTimestamp(System.currentTimeMillis());
+    return builder.build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
----------------------------------------------------------------------
diff --git 
a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java 
b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
index 3257f46..152442d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
@@ -20,6 +20,7 @@ package org.apache.tajo.exception;
 
 import com.google.common.base.Preconditions;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.error.Errors;
 import org.apache.tajo.error.Errors.ResultCode;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListResponse;
@@ -83,9 +84,20 @@ public class ReturnStateUtil {
     } else {
       builder.setReturnCode(ResultCode.INTERNAL_ERROR);
       builder.setMessage(ErrorMessages.getInternalErrorMessage(t));
-      builder.setStackTrace(ErrorUtil.convertStacktrace(t));
     }
 
+    builder.setStackTrace(ErrorUtil.convertStacktrace(t));
+    return builder.build();
+  }
+
+  public static ReturnState returnError(Errors.SerializedException e) {
+    ReturnState.Builder builder = ReturnState.newBuilder();
+
+    builder.setReturnCode(e.getReturnCode());
+    builder.setMessage(e.getMessage());
+    if (e.hasStackTrace()) {
+      builder.setStackTrace(e.getStackTrace());
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
 
b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
index 6d939de..fcf4546 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java
@@ -141,6 +141,6 @@ public class TestTajoCliNegatives extends QueryTestCaseBase 
{
   public void testQueryFailure() throws Exception {
     setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, 
TajoCliOutputTestFormatter.class.getName());
     assertScriptFailure("select fail(3, l_orderkey, 'testQueryFailure') from 
default.lineitem where l_orderkey > 0" ,
-        "ERROR: Internal error. Please check out log files in 
${tajo_install_dir}/logs directory.\n");
+        "ERROR: internal error: testQueryFailure\n");
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
----------------------------------------------------------------------
diff --git 
a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java 
b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
index 7d7fb1a..cbc4312 100644
--- 
a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
+++ 
b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java
@@ -36,7 +36,7 @@ public class MockExecutionBlock extends ExecutionBlockContext 
{
   }
 
   @Override
-  public void fatalError(TaskAttemptId taskAttemptId, String message) {
+  public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) {
 
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index e4217b3..776a783 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -28,7 +28,6 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.KeyProjector;
 import org.apache.tajo.plan.logical.ShuffleFileWriteNode;
 import org.apache.tajo.plan.util.PlannerUtil;
@@ -74,7 +73,7 @@ public class RangeShuffleFileWriteExec extends 
UnaryPhysicalExec {
     keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
     keyProjector = new KeyProjector(inSchema, keySchema.toArray());
 
-    BSTIndex bst = new BSTIndex(new TajoConf());
+    BSTIndex bst = new BSTIndex(context.getConf());
     this.comp = new BaseTupleComparator(keySchema, sortSpecs);
     Path storeTablePath = new Path(context.getWorkDir(), "output");
     LOG.info("Output data directory: " + storeTablePath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java 
b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
index 6c324d9..eba633e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java
@@ -44,6 +44,8 @@ public class QueryInfo implements GsonObject, History, 
Comparable<QueryInfo> {
   private volatile long startTime;
   @Expose
   private volatile  long finishTime;
+
+  @Deprecated
   @Expose
   private String lastMessage;
   @Expose

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java 
b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 24ed830..b4f1d66 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -346,7 +346,10 @@ public class QueryManager extends CompositeService {
     queryInfo.setQueryMaster(connectionInfo.getHost());
     queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort());
     queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort());
-    queryInfo.setLastMessage(queryHeartbeat.getStatusMessage());
+    if(queryHeartbeat.hasError()) {
+      //TODO set error instead of last message
+      queryInfo.setLastMessage(queryHeartbeat.getError().getMessage());
+    }
     queryInfo.setQueryState(queryHeartbeat.getState());
     queryInfo.setProgress(queryHeartbeat.getQueryProgress());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index bb04229..bfba51d 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -39,10 +39,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.exception.QueryNotFoundException;
-import org.apache.tajo.exception.ReturnStateUtil;
-import org.apache.tajo.exception.UnavailableTableLocationException;
-import org.apache.tajo.exception.UndefinedDatabaseException;
+import org.apache.tajo.exception.*;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
@@ -61,6 +58,7 @@ import org.apache.tajo.session.Session;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.StringUtils;
 
 import java.net.InetSocketAddress;
 import java.util.*;
@@ -109,7 +107,7 @@ public class TajoMasterClientService extends 
AbstractService {
   @Override
   public void serviceStop() throws Exception {
     if (server != null) {
-      server.shutdown();
+      server.shutdown(true);
     }
     super.serviceStop();
   }
@@ -505,6 +503,10 @@ public class TajoMasterClientService extends 
AbstractService {
               builder.setFinishTime(queryInfo.getFinishTime());
             } else {
               builder.setFinishTime(System.currentTimeMillis());
+
+              if(!StringUtils.isEmpty(queryInfo.getLastMessage())) {
+                builder.setErrorMessage(queryInfo.getLastMessage());
+              }
             }
           } else {
             Session session = 
context.getSessionManager().getSession(request.getSessionId().getId());

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java
new file mode 100644
index 0000000..731134e
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.TaskId;
+import org.apache.tajo.error.Errors.SerializedException;
+import org.apache.tajo.master.TaskState;
+
+/**
+ * Event Class: From Task to Stage
+ */
+public class StageTaskFailedEvent extends StageTaskEvent {
+  private final SerializedException exception;
+
+  public StageTaskFailedEvent(TaskId taskId, SerializedException exception) {
+    super(taskId, TaskState.FAILED);
+    this.exception = exception;
+  }
+
+  public SerializedException getException() {
+    return exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index d50fcb8..351a42b 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -20,22 +20,23 @@ package org.apache.tajo.master.event;
 
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.ResourceProtos.TaskFatalErrorReport;
+import org.apache.tajo.error.Errors.SerializedException;
+import org.apache.tajo.exception.ErrorUtil;
 
 public class TaskFatalErrorEvent extends TaskAttemptEvent {
-  private final String message;
+  private final SerializedException error;
 
   public TaskFatalErrorEvent(TaskFatalErrorReport report) {
-    super(new TaskAttemptId(report.getId()),
-        TaskAttemptEventType.TA_FATAL_ERROR);
-    this.message = report.getErrorMessage();
+    super(new TaskAttemptId(report.getId()), 
TaskAttemptEventType.TA_FATAL_ERROR);
+    this.error = report.getError();
   }
 
-  public TaskFatalErrorEvent(TaskAttemptId attemptId, String message) {
+  public TaskFatalErrorEvent(TaskAttemptId attemptId, Throwable e) {
     super(attemptId, TaskAttemptEventType.TA_FATAL_ERROR);
-    this.message = message;
+    this.error = ErrorUtil.convertException(e);
   }
 
-  public String errorMessage() {
-    return message;
+  public SerializedException getError() {
+    return error;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java
new file mode 100644
index 0000000..77e0c4a
--- /dev/null
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.error.Errors.SerializedException;
+
+public class TaskTAttemptFailedEvent extends TaskTAttemptEvent {
+  private final SerializedException exception;
+
+  public TaskTAttemptFailedEvent(TaskAttemptId attemptId,
+                                 SerializedException exception) {
+    super(attemptId, TaskEventType.T_ATTEMPT_FAILED);
+    this.exception = exception;
+  }
+
+  public SerializedException getException() {
+    return exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
index 8535912..be30af2 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java
@@ -97,7 +97,7 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
   @Override
   public void init(Configuration conf) {
     tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
-    rpcParams = RpcParameterFactory.get(new TajoConf());
+    rpcParams = RpcParameterFactory.get(tajoConf);
 
     scheduledRequests = new ScheduledRequests();
     minTaskMemory = 
tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY);
@@ -116,11 +116,11 @@ public class DefaultTaskScheduler extends 
AbstractTaskScheduler {
               break;
             } else {
               LOG.fatal(e.getMessage(), e);
-              stage.abort(StageState.ERROR);
+              stage.abort(StageState.ERROR, e);
             }
           } catch (Throwable e) {
             LOG.fatal(e.getMessage(), e);
-            stage.abort(StageState.ERROR);
+            stage.abort(StageState.ERROR, e);
             break;
           }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 82907ca..0cd178f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -44,6 +44,8 @@ import 
org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.ExecutionQueue;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.error.Errors.SerializedException;
+import org.apache.tajo.exception.ErrorUtil;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
@@ -79,11 +81,12 @@ public class Query implements EventHandler<QueryEvent> {
   private long startTime;
   private long finishTime;
   private TableDesc resultDesc;
-  private int completedStagesCount = 0;
-  private int succeededStagesCount = 0;
-  private int killedStagesCount = 0;
-  private int failedStagesCount = 0;
-  private int erroredStagesCount = 0;
+  private volatile int completedStagesCount = 0;
+  private volatile int succeededStagesCount = 0;
+  private volatile int killedStagesCount = 0;
+  private volatile int failedStagesCount = 0;
+  private volatile int erroredStagesCount = 0;
+  private volatile SerializedException failureReason;
   private final List<String> diagnostics = new ArrayList<>();
 
   // Internal Variables
@@ -341,6 +344,10 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
+  public SerializedException getFailureReason() {
+    return failureReason;
+  }
+
   public List<String> getDiagnostics() {
     readLock.lock();
     try {
@@ -530,6 +537,8 @@ public class Query implements EventHandler<QueryEvent> {
           query.clearPartitions();
         }
       } catch (Throwable e) {
+        LOG.fatal(e.getMessage(), e);
+        query.failureReason = ErrorUtil.convertException(e);
         query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, 
ExceptionUtils.getStackTrace(e)));
         return QueryState.QUERY_ERROR;
       }
@@ -791,8 +800,10 @@ public class Query implements EventHandler<QueryEvent> {
           query.killedStagesCount++;
         } else if (castEvent.getState() == StageState.FAILED) {
           query.failedStagesCount++;
+          query.failureReason = 
query.getStage(castEvent.getExecutionBlockId()).getFailureReason();
         } else if (castEvent.getState() == StageState.ERROR) {
           query.erroredStagesCount++;
+          query.failureReason = 
query.getStage(castEvent.getExecutionBlockId()).getFailureReason();
         } else {
           LOG.error(String.format("Invalid Stage (%s) State %s at %s",
               castEvent.getExecutionBlockId().toString(), 
castEvent.getState().name(), query.getSynchronizedState().name()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index 7104fb9..adc7b08 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -37,7 +37,7 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.exception.ReturnStateUtil;
+import org.apache.tajo.exception.ErrorUtil;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import 
org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService;
 import org.apache.tajo.master.event.QueryStartEvent;
@@ -351,9 +351,13 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
         
builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto());
       }
       builder.setQueryProgress(queryMasterTask.getQuery().getProgress());
+
+      if(queryMasterTask.getQuery().getFailureReason() != null) {
+        builder.setError(queryMasterTask.getQuery().getFailureReason());
+      }
     }
     if (queryMasterTask.isInitError()) {
-      
builder.setStatusMessage(ReturnStateUtil.returnError(queryMasterTask.getInitError()).getMessage());
+      
builder.setError(ErrorUtil.convertException(queryMasterTask.getInitError()));
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 995a8e5..6030d90 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -37,6 +37,7 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
@@ -266,12 +267,12 @@ public class QueryMasterTask extends CompositeService {
 
       if(!callFuture.get().getValue()){
         getEventHandler().handle(
-            new TaskFatalErrorEvent(taskAttemptId, "Can't kill task :" + 
taskAttemptId));
+            new TaskFatalErrorEvent(taskAttemptId, new 
TajoInternalError("Can't kill task :" + taskAttemptId)));
       }
     } catch (Exception e) {
       /* Node RPC failure */
       LOG.error(e.getMessage(), e);
-      getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, 
e.getMessage()));
+      getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 5f050bf..85086e6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -42,7 +42,10 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.error.Errors.SerializedException;
+import org.apache.tajo.exception.ErrorUtil;
 import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TaskState;
 import org.apache.tajo.master.event.*;
@@ -63,6 +66,7 @@ import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.RpcParameterFactory;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.StageHistory;
 import org.apache.tajo.util.history.TaskHistory;
 
@@ -289,6 +293,7 @@ public class Stage implements EventHandler<StageEvent> {
   private volatile int succeededObjectCount = 0;
   private volatile int killedObjectCount = 0;
   private volatile int failedObjectCount = 0;
+  private volatile SerializedException failureReason;
   private TaskSchedulerContext schedulerContext;
   private List<IntermediateEntry> hashShuffleIntermediateEntries = 
Lists.newArrayList();
   private AtomicInteger completedShuffleTasks = new AtomicInteger(0);
@@ -412,6 +417,10 @@ public class Stage implements EventHandler<StageEvent> {
     return completedTaskCount;
   }
 
+  public SerializedException getFailureReason() {
+    return failureReason;
+  }
+
   public ExecutionBlock getBlock() {
     return block;
   }
@@ -533,18 +542,26 @@ public class Stage implements EventHandler<StageEvent> {
     eventHandler.handle(new StageCompletedEvent(getId(), 
StageState.SUCCEEDED));
   }
 
+  public void abort(StageState finalState) {
+    abort(finalState, null);
+  }
+
   /**
    * It finalizes this stage. Unlike {@link Stage#complete()},
    * it is invoked when a stage is abnormally finished.
    *
    * @param finalState The final stage state
+   * @param reason The failure reason, if exist
    */
-  public void abort(StageState finalState) {
+  public void abort(StageState finalState, Throwable reason) {
     // TODO -
     // - committer.abortStage(...)
     // - record Stage Finish Time
     // - CleanUp Tasks
     // - Record History
+    if(reason != null)
+      failureReason = ErrorUtil.convertException(reason);
+
     cleanup();
     setFinishTime();
     eventHandler.handle(new StageCompletedEvent(getId(), finalState));
@@ -1229,7 +1246,9 @@ public class Stage implements EventHandler<StageEvent> {
         } else if (task.getState() == TaskState.KILLED) {
           stage.killedObjectCount++;
         } else if (task.getState() == TaskState.FAILED) {
+          StageTaskFailedEvent failedEvent = TUtil.checkTypeAndGet(event, 
StageTaskFailedEvent.class);
           stage.failedObjectCount++;
+          stage.failureReason = failedEvent.getException();
           // if at least one task is failed, try to kill all tasks.
           stage.eventHandler.handle(new StageEvent(stage.getId(), 
StageEventType.SQ_KILL));
         }
@@ -1407,17 +1426,14 @@ public class Stage implements EventHandler<StageEvent> {
             stage.getSucceededObjectCount(),
             stage.killedObjectCount));
 
-        if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) {
+        // If the current stage are failed, next stages receives SQ_KILL event
+        if (stage.killedObjectCount + stage.failedObjectCount > 0) {
           if (stage.failedObjectCount > 0) {
             stage.abort(StageState.FAILED);
             return StageState.FAILED;
-          } else if (stage.killedObjectCount > 0) {
+          } else {
             stage.abort(StageState.KILLED);
             return StageState.KILLED;
-          } else {
-            LOG.error("Invalid State " + stage.getSynchronizedState() + " 
State");
-            stage.abort(StageState.ERROR);
-            return StageState.ERROR;
           }
         } else {
           stage.complete();
@@ -1425,7 +1441,7 @@ public class Stage implements EventHandler<StageEvent> {
         }
       } catch (Throwable t) {
         LOG.error(t.getMessage(), t);
-        stage.abort(StageState.ERROR);
+        stage.abort(StageState.ERROR, t);
         return StageState.ERROR;
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
index c56bb0e..466f6c9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java
@@ -45,6 +45,7 @@ import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.TajoIdUtils;
 import org.apache.tajo.util.history.TaskHistory;
 
@@ -624,10 +625,8 @@ public class Task implements EventHandler<TaskEvent> {
   private static class AttemptFailedTransition implements 
SingleArcTransition<Task, TaskEvent> {
     @Override
     public void transition(Task task, TaskEvent event) {
-      if (!(event instanceof TaskTAttemptEvent)) {
-        throw new IllegalArgumentException("event should be a 
TaskTAttemptEvent type.");
-      }
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      TaskTAttemptFailedEvent attemptEvent = TUtil.checkTypeAndGet(event, 
TaskTAttemptFailedEvent.class);
+
       
LOG.info("=============================================================");
       LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
       
LOG.info("=============================================================");
@@ -635,7 +634,7 @@ public class Task implements EventHandler<TaskEvent> {
       task.finishedAttempts++;
 
       task.finishTask();
-      task.eventHandler.handle(new StageTaskEvent(task.getId(), 
TaskState.FAILED));
+      task.eventHandler.handle(new StageTaskFailedEvent(task.getId(), 
attemptEvent.getException()));
     }
   }
 
@@ -644,10 +643,8 @@ public class Task implements EventHandler<TaskEvent> {
 
     @Override
     public TaskState transition(Task task, TaskEvent taskEvent) {
-      if (!(taskEvent instanceof TaskTAttemptEvent)) {
-        throw new IllegalArgumentException("taskEvent should be a 
TaskTAttemptEvent type.");
-      }
-      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent;
+      TaskTAttemptFailedEvent attemptEvent = TUtil.checkTypeAndGet(taskEvent, 
TaskTAttemptFailedEvent.class);
+
       task.failedAttempts++;
       task.finishedAttempts++;
       boolean retry = task.failedAttempts < task.maxAttempts;
@@ -663,7 +660,7 @@ public class Task implements EventHandler<TaskEvent> {
         }
       } else {
         task.finishTask();
-        task.eventHandler.handle(new StageTaskEvent(task.getId(), 
TaskState.FAILED));
+        task.eventHandler.handle(new StageTaskFailedEvent(task.getId(), 
attemptEvent.getException()));
         return TaskState.FAILED;
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java 
b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
index b5ffc03..ed3002a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java
@@ -409,7 +409,7 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
         taskAttempt.fillTaskStatistics(report);
         taskAttempt.eventHandler.handle(new 
TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED));
       } catch (Throwable t) {
-        taskAttempt.eventHandler.handle(new 
TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage()));
+        taskAttempt.eventHandler.handle(new 
TaskFatalErrorEvent(taskAttempt.getId(), t));
         taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t));
       }
     }
@@ -432,10 +432,10 @@ public class TaskAttempt implements 
EventHandler<TaskAttemptEvent> {
         throw new IllegalArgumentException("event should be a 
TaskFatalErrorEvent type.");
       }
       TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event;
-      taskAttempt.eventHandler.handle(new 
TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED));
-      taskAttempt.addDiagnosticInfo(errorEvent.errorMessage());
+      taskAttempt.eventHandler.handle(new 
TaskTAttemptFailedEvent(taskAttempt.getId(), errorEvent.getError()));
+      taskAttempt.addDiagnosticInfo(errorEvent.getError().getMessage());
       LOG.error(taskAttempt.getId() + " FROM " + 
taskAttempt.getWorkerConnectionInfo().getHost()
-          + " >> " + errorEvent.errorMessage());
+          + " >> " + errorEvent.getError().getMessage());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index 098567a..e675d70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -38,8 +38,9 @@ import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.ErrorUtil;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.serder.PlanProto;
@@ -308,13 +309,13 @@ public class ExecutionBlockContext {
     return taskHistories;
   }
 
-  public void fatalError(TaskAttemptId taskAttemptId, String message) {
-    if (message == null) {
-      message = "No error message";
+  public void fatalError(TaskAttemptId taskAttemptId, Throwable error) {
+    if (error == null) {
+      error = new TajoInternalError("No error message");
     }
     TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
         .setId(taskAttemptId.getProto())
-        .setErrorMessage(message);
+        .setError(ErrorUtil.convertException(error));
 
     try {
       //If QueryMaster does not responding, current execution block should be 
stop

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
index bd28bb7..ac37258 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
@@ -71,7 +71,7 @@ public class TaskContainer implements Runnable {
         if (task != null) {
           try {
             task.abort();
-            
task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), 
e.getMessage());
+            
task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), 
e);
           } catch (Throwable t) {
             LOG.fatal(t.getMessage(), t);
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
index 57f3cd9..7476580 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java
@@ -31,6 +31,7 @@ import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.TaskRequestImpl;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.resource.NodeResource;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.event.NodeResourceDeallocateEvent;
@@ -157,7 +158,7 @@ public class TaskExecutor extends AbstractService 
implements EventHandler<TaskSt
     if (executionBlockContext.getTasks().containsKey(taskAttemptId)) {
       String errorMessage = "Duplicate Task Attempt: " + taskAttemptId;
       LOG.error(errorMessage);
-      executionBlockContext.fatalError(taskAttemptId, errorMessage);
+      executionBlockContext.fatalError(taskAttemptId, new 
TajoInternalError(errorMessage));
     } else {
       task = new TaskImpl(new TaskRequestImpl(taskRequest), 
executionBlockContext);
       executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), 
task);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 6d9639c..55eb02a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -21,7 +21,6 @@ package org.apache.tajo.worker;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +40,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.exception.ErrorUtil;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.plan.function.python.TajoScriptEngine;
@@ -448,18 +448,10 @@ public class TaskImpl implements Task {
           executionBlockContext.killedTasksNum.incrementAndGet();
         } else {
           context.setState(TaskAttemptState.TA_FAILED);
-          TaskFatalErrorReport.Builder errorBuilder =
-              TaskFatalErrorReport.newBuilder()
-                  .setId(getId().getProto());
-          if (error != null) {
-            if (error.getMessage() == null) {
-              
errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
-            } else {
-              errorBuilder.setErrorMessage(error.getMessage());
-            }
-            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
-          }
+          TaskFatalErrorReport.Builder errorBuilder = 
TaskFatalErrorReport.newBuilder();
 
+          errorBuilder.setId(getId().getProto());
+          errorBuilder.setError(ErrorUtil.convertException(error));
           queryMasterStub.fatalError(null, errorBuilder.build(), 
NullCallback.get());
           executionBlockContext.failedTasksNum.incrementAndGet();
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b8b7066a/tajo-core/src/main/proto/ResourceProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceProtos.proto 
b/tajo-core/src/main/proto/ResourceProtos.proto
index 3643a97..74a475e 100644
--- a/tajo-core/src/main/proto/ResourceProtos.proto
+++ b/tajo-core/src/main/proto/ResourceProtos.proto
@@ -26,6 +26,8 @@ import "TajoIdProtos.proto";
 import "CatalogProtos.proto";
 import "PrimitiveProtos.proto";
 import "Plan.proto";
+import "errors.proto";
+import "stacktrace.proto";
 
 enum ResponseCommand {
   NORMAL = 1;  // ping
@@ -114,8 +116,7 @@ message TaskCompletionReport {
 
 message TaskFatalErrorReport {
   required TaskAttemptIdProto id = 1;
-  optional string error_message = 2;
-  optional string error_trace = 3;
+  required tajo.error.SerializedException error = 2;
 }
 
 message FailureIntermediateProto {
@@ -215,7 +216,7 @@ message TajoHeartbeatRequest {
   optional QueryIdProto query_id = 2;
   optional QueryState state = 3;
   optional TableDescProto result_desc = 4;
-  optional string status_message = 5;
+  optional tajo.error.SerializedException error = 5;
   optional float query_progress = 6;
 }
 

Reply via email to