Repository: tajo Updated Branches: refs/heads/master 5a7e27254 -> 9e026a9a2
TAJO-1025: Network disconnection during query processing can cause infinite exceptions. (Jihun Kang via jinho) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9e026a9a Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9e026a9a Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9e026a9a Branch: refs/heads/master Commit: 9e026a9a26a1004d4742f0e44d5b5d71691195ad Parents: 5a7e272 Author: jhkim <[email protected]> Authored: Wed Sep 24 15:45:04 2014 +0900 Committer: jhkim <[email protected]> Committed: Wed Sep 24 15:45:04 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 +++ .../tajo/worker/ExecutionBlockContext.java | 4 ++- .../java/org/apache/tajo/worker/TaskRunner.java | 26 +++++++++++++++++++- .../apache/tajo/rpc/RemoteCallException.java | 3 +++ 4 files changed, 34 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 3372eb6..7cbd524 100644 --- a/CHANGES +++ b/CHANGES @@ -152,6 +152,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-1025: Network disconnection during query processing can cause + infinite exceptions. (Jihun Kang via jinho) + TAJO-1047: DefaultTaskScheduler:allocateRackTask is failed occasionally on JDK 1.7. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 1ec8a88..f18723f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -43,6 +43,7 @@ import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; import org.apache.tajo.worker.event.TaskRunnerStartEvent; +import org.jboss.netty.channel.ConnectTimeoutException; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.util.Timer; @@ -146,7 +147,8 @@ public class ExecutionBlockContext { return resource; } - public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() throws Exception { + public QueryMasterProtocol.QueryMasterProtocolService.Interface getQueryMasterStub() + throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { NettyClientBase clientBase = null; try { clientBase = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true); http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index e4771a6..1910575 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -35,6 +35,7 @@ import org.apache.tajo.engine.query.QueryUnitRequestImpl; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NullCallback; +import org.jboss.netty.channel.ConnectTimeoutException; import java.util.concurrent.*; @@ -180,6 +181,7 @@ public class TaskRunner extends AbstractService { try { taskLauncher = new Thread(new Runnable() { + @Override public void run() { int receivedNum = 0; @@ -190,7 +192,20 @@ public class TaskRunner extends AbstractService { QueryMasterProtocolService.Interface qmClientService; try { qmClientService = getContext().getQueryMasterStub(); + } catch (ConnectTimeoutException ce) { + // NettyClientBase throws ConnectTimeoutException if connection was failed + stop(); + getContext().stopTaskRunner(getId()); + LOG.error("Connecting to QueryMaster was failed.", ce); + break; + } catch (Throwable t) { + LOG.fatal("Unable to handle exception: " + t.getMessage(), t); + stop(); + getContext().stopTaskRunner(getId()); + break; + } + try { if (callFuture == null) { callFuture = new CallFuture<QueryUnitRequestProto>(); LOG.info("Request GetTask: " + getId()); @@ -200,7 +215,7 @@ public class TaskRunner extends AbstractService { .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId()) .build(); - qmClientService.getTask(null, request, callFuture); + qmClientService.getTask(callFuture.getController(), request, callFuture); } try { // wait for an assigning task for 3 seconds @@ -213,6 +228,11 @@ public class TaskRunner extends AbstractService { if(stopped) { break; } + + if(callFuture.getController().failed()){ + LOG.error(callFuture.getController().errorText()); + break; + } // if there has been no assigning task for a given period, // TaskRunner will retry to request an assigning task. if (LOG.isDebugEnabled()) { @@ -262,6 +282,10 @@ public class TaskRunner extends AbstractService { taskRequest = null; } } + } else { + stop(); + //notify to TaskRunnerManager + getContext().stopTaskRunner(getId()); } } catch (Throwable t) { LOG.fatal(t.getMessage(), t); http://git-wip-us.apache.org/repos/asf/tajo/blob/9e026a9a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java index 949aa58..90ee58a 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RemoteCallException.java @@ -41,6 +41,9 @@ public class RemoteCallException extends RemoteException { public RemoteCallException(int seqId, Throwable t) { super(t); this.seqId = seqId; + if (t != null) { + originExceptionClass = t.getClass().getCanonicalName(); + } } public RpcResponse getResponse() {
