Repository: tajo
Updated Branches:
  refs/heads/master 23f81373f -> 60858b25c


TAJO-985: Client API should be non-blocking. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/60858b25
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/60858b25
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/60858b25

Branch: refs/heads/master
Commit: 60858b25c46fceedf70b8583f1d7ef1383a19c6b
Parents: 23f8137
Author: jhkim <[email protected]>
Authored: Mon Aug 18 13:25:02 2014 +0900
Committer: jhkim <[email protected]>
Committed: Mon Aug 18 13:25:02 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../main/java/org/apache/tajo/cli/TajoCli.java  | 10 +--
 .../java/org/apache/tajo/client/TajoAdmin.java  |  2 +-
 .../java/org/apache/tajo/client/TajoClient.java | 26 ++++--
 .../apache/tajo/master/querymaster/Query.java   | 25 ++++--
 .../tajo/master/querymaster/QueryMaster.java    | 14 ++++
 .../master/querymaster/QueryMasterTask.java     |  4 +-
 .../tajo/master/querymaster/SubQuery.java       | 85 +++++++++++++-------
 .../tajo/worker/TajoResourceAllocator.java      |  2 +-
 .../tajo/worker/TajoWorkerClientService.java    | 12 +--
 .../org/apache/tajo/client/TestTajoClient.java  |  2 +-
 11 files changed, 124 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 14eb1e3..dee8fb5 100644
--- a/CHANGES
+++ b/CHANGES
@@ -112,6 +112,8 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-985: Client API should be non-blocking. (jinho)
+
     TAJO-1006: Fix wrong storage unit for kilo bytes and others. (hyunsik)
 
     TAJO-1000: TextDatum.asChar() is incorrect, if client charset is different.

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java 
b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 81427ab..fdc766e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -549,22 +549,20 @@ public class TajoCli {
       while (true) {
         // TODO - configurable
         status = client.getQueryStatus(queryId);
-        if(status.getState() == QueryState.QUERY_MASTER_INIT || 
status.getState() == QueryState.QUERY_MASTER_LAUNCHED) {
+        if(TajoClient.isInPreNewState(status.getState())) {
           Thread.sleep(Math.min(20 * initRetries, 1000));
           initRetries++;
           continue;
         }
 
-        if (status.getState() == QueryState.QUERY_RUNNING || status.getState() 
== QueryState.QUERY_SUCCEEDED) {
+        if (TajoClient.isInRunningState(status.getState()) || 
status.getState() == QueryState.QUERY_SUCCEEDED) {
           displayFormatter.printProgress(sout, status);
         }
 
-        if (status.getState() != QueryState.QUERY_RUNNING &&
-            status.getState() != QueryState.QUERY_NOT_ASSIGNED &&
-            status.getState() != QueryState.QUERY_KILL_WAIT) {
+        if (TajoClient.isInCompleteState(status.getState()) && 
status.getState() != QueryState.QUERY_KILL_WAIT) {
           break;
         } else {
-          Thread.sleep(Math.min(200 * progressRetries, 1000));
+          Thread.sleep(Math.min(100 * progressRetries, 1000));
           progressRetries += 2;
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 4f38858..58b7184 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -194,7 +194,7 @@ public class TajoAdmin {
         long end = queryInfo.getFinishTime();
         long start = queryInfo.getStartTime();
         String executionTime = decimalF.format((end-start) / 1000) + " sec";
-        if (!TajoClient.isQueryRunnning(queryInfo.getState())) {
+        if (TajoClient.isInCompleteState(queryInfo.getState())) {
           writer.write("Finished Time: " + 
df.format(queryInfo.getFinishTime()));
           writer.write("\n");
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java 
b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 333c8d6..3a90a48 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -446,12 +446,26 @@ public class TajoClient implements Closeable {
     return new QueryStatus(res);
   }
 
-  public static boolean isQueryRunnning(QueryState state) {
-    return state == QueryState.QUERY_NEW ||
-        state == QueryState.QUERY_RUNNING ||
-        state == QueryState.QUERY_MASTER_LAUNCHED ||
+  /* query submit */
+  public static boolean isInPreNewState(QueryState state) {
+    return state == QueryState.QUERY_NOT_ASSIGNED ||
         state == QueryState.QUERY_MASTER_INIT ||
-        state == QueryState.QUERY_NOT_ASSIGNED;
+        state == QueryState.QUERY_MASTER_LAUNCHED;
+  }
+
+  /* query submitted. but is not running */
+  public static boolean isInInitState(QueryState state) {
+    return  state == QueryState.QUERY_NEW || state == QueryState.QUERY_INIT;
+  }
+
+  /* query started. but is not complete */
+  public static boolean isInRunningState(QueryState state) {
+    return isInInitState(state) || state == QueryState.QUERY_RUNNING;
+  }
+
+  /* query complete */
+  public static boolean isInCompleteState(QueryState state) {
+    return !isInPreNewState(state) && !isInRunningState(state);
   }
 
   public ResultSet getQueryResult(QueryId queryId)
@@ -507,7 +521,7 @@ public class TajoClient implements Closeable {
     }
     QueryStatus status = getQueryStatus(queryId);
 
-    while(status != null && isQueryRunnning(status.getState())) {
+    while(status != null && !isInCompleteState(status.getState())) {
       try {
         Thread.sleep(500);
       } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 8111ef6..4f4aaab 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -94,6 +94,7 @@ public class Query implements EventHandler<QueryEvent> {
 
   // State Machine
   private final StateMachine<QueryState, QueryEventType, QueryEvent> 
stateMachine;
+  private QueryState queryState;
 
   // Transition Handler
   private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new 
InternalErrorTransition();
@@ -231,10 +232,11 @@ public class Query implements EventHandler<QueryEvent> {
     this.writeLock = readWriteLock.writeLock();
 
     stateMachine = stateMachineFactory.make(this);
+    queryState = stateMachine.getCurrentState();
   }
 
   public float getProgress() {
-    QueryState state = getStateMachine().getCurrentState();
+    QueryState state = getState();
     if (state == QueryState.QUERY_SUCCEEDED) {
       return 1.0f;
     } else {
@@ -253,6 +255,7 @@ public class Query implements EventHandler<QueryEvent> {
           }
         } else {
           subProgresses[idx] = 0.0f;
+          finished = false;
         }
         idx++;
       }
@@ -337,7 +340,7 @@ public class Query implements EventHandler<QueryEvent> {
     return this.subqueries.values();
   }
 
-  public QueryState getState() {
+  public QueryState getSynchronizedState() {
     readLock.lock();
     try {
       return stateMachine.getCurrentState();
@@ -346,6 +349,11 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
+  /* non-blocking call for client API */
+  public QueryState getState() {
+    return queryState;
+  }
+
   public ExecutionBlockCursor getExecutionBlockCursor() {
     return cursor;
   }
@@ -783,13 +791,13 @@ public class Query implements EventHandler<QueryEvent> {
           query.erroredSubQueryCount++;
         } else {
           LOG.error(String.format("Invalid SubQuery (%s) State %s at %s",
-              castEvent.getExecutionBlockId().toString(), 
castEvent.getState().name(), query.getState().name()));
+              castEvent.getExecutionBlockId().toString(), 
castEvent.getState().name(), query.getSynchronizedState().name()));
           query.eventHandler.handle(new QueryEvent(event.getQueryId(), 
QueryEventType.INTERNAL_ERROR));
         }
 
         // if a subquery is succeeded and a query is running
         if (castEvent.getState() == SubQueryState.SUCCEEDED &&  // latest 
subquery succeeded
-            query.getState() == QueryState.QUERY_RUNNING &&     // current 
state is not in KILL_WAIT, FAILED, or ERROR.
+            query.getSynchronizedState() == QueryState.QUERY_RUNNING &&     // 
current state is not in KILL_WAIT, FAILED, or ERROR.
             hasNext(query)) {                                   // there 
remains at least one subquery.
           executeNextBlock(query);
         } else { // if a query is completed due to finished, kill, failure, or 
error
@@ -842,21 +850,22 @@ public class Query implements EventHandler<QueryEvent> {
     LOG.info("Processing " + event.getQueryId() + " of type " + 
event.getType());
     try {
       writeLock.lock();
-      QueryState oldState = getState();
+      QueryState oldState = getSynchronizedState();
       try {
         getStateMachine().doTransition(event.getType(), event);
+        queryState = getSynchronizedState();
       } catch (InvalidStateTransitonException e) {
         LOG.error("Can't handle this event at current state"
             + ", type:" + event
             + ", oldState:" + oldState.name()
-            + ", nextState:" + getState().name()
+            + ", nextState:" + getSynchronizedState().name()
             , e);
         eventHandler.handle(new QueryEvent(this.id, 
QueryEventType.INTERNAL_ERROR));
       }
 
       //notify the eventhandler of state change
-      if (oldState != getState()) {
-        LOG.info(id + " Query Transitioned from " + oldState + " to " + 
getState());
+      if (oldState != getSynchronizedState()) {
+        LOG.info(id + " Query Transitioned from " + oldState + " to " + 
getSynchronizedState());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index aed69b2..88589f9 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -53,6 +53,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -95,6 +97,8 @@ public class QueryMaster extends CompositeService implements 
EventHandler {
 
   private RpcConnectionPool connPool;
 
+  private ExecutorService eventExecutor;
+
   public QueryMaster(TajoWorker.WorkerContext workerContext) {
     super(QueryMaster.class.getName());
     this.workerContext = workerContext;
@@ -140,6 +144,7 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     finishedQueryMasterTaskCleanThread = new 
FinishedQueryMasterTaskCleanThread();
     finishedQueryMasterTaskCleanThread.start();
 
+    eventExecutor = Executors.newSingleThreadExecutor();
     super.start();
   }
 
@@ -160,6 +165,11 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
     if(finishedQueryMasterTaskCleanThread != null) {
       finishedQueryMasterTaskCleanThread.interrupt();
     }
+
+    if(eventExecutor != null){
+      eventExecutor.shutdown();
+    }
+
     super.stop();
 
     LOG.info("QueryMaster stop");
@@ -312,6 +322,10 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
       return conf;
     }
 
+    public ExecutorService getEventExecutor(){
+      return eventExecutor;
+    }
+
     public TajoAsyncDispatcher getDispatcher() {
       return dispatcher;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 5885a1d..1889a56 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -296,7 +296,7 @@ public class QueryMasterTask extends CompositeService {
       QueryId queryId = event.getQueryId();
       LOG.info("Query completion notified from " + queryId);
 
-      while (!isTerminatedState(query.getState())) {
+      while (!isTerminatedState(query.getSynchronizedState())) {
         try {
           synchronized (this) {
             wait(10);
@@ -305,7 +305,7 @@ public class QueryMasterTask extends CompositeService {
           LOG.error(e);
         }
       }
-      LOG.info("Query final state: " + query.getState());
+      LOG.info("Query final state: " + query.getSynchronizedState());
       queryMasterContext.stopQuery(queryId);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index b6fe9da..8eff8a4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -87,6 +87,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private AbstractTaskScheduler taskScheduler;
   private QueryMasterTask.QueryMasterTaskContext context;
   private final List<String> diagnostics = new ArrayList<String>();
+  private SubQueryState subQueryState;
 
   private long startTime;
   private long finishTime;
@@ -261,6 +262,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();
     stateMachine = stateMachineFactory.make(this);
+    subQueryState = stateMachine.getCurrentState();
   }
 
   public static boolean isRunningState(SubQueryState state) {
@@ -323,7 +325,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     readLock.lock();
     try {
       if (getState() == SubQueryState.NEW) {
-        return 0;
+        return 0.0f;
       } else {
         tempTasks = new ArrayList<QueryUnit>(tasks.values());
       }
@@ -338,7 +340,11 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       }
     }
 
-    return totalProgress/(float)tempTasks.size();
+    if (totalProgress > 0.0f) {
+      return (float) Math.floor((totalProgress / (float) tempTasks.size()) * 
1000.0f) / 1000.0f;
+    } else {
+      return 0.0f;
+    }
   }
 
   public int getSucceededObjectCount() {
@@ -466,7 +472,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     return getId().compareTo(other.getId());
   }
 
-  public SubQueryState getState() {
+  public SubQueryState getSynchronizedState() {
     readLock.lock();
     try {
       return stateMachine.getCurrentState();
@@ -475,6 +481,11 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
     }
   }
 
+  /* non-blocking call for client API */
+  public SubQueryState getState() {
+    return subQueryState;
+  }
+
   public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) {
     TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()};
     long[] avgRows = new long[]{0, 0};
@@ -584,19 +595,20 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
   @Override
   public void handle(SubQueryEvent event) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getSubQueryId() + " of type " + 
event.getType() + ", preState=" + getState());
+      LOG.debug("Processing " + event.getSubQueryId() + " of type " + 
event.getType() + ", preState=" + getSynchronizedState());
     }
 
     try {
       writeLock.lock();
-      SubQueryState oldState = getState();
+      SubQueryState oldState = getSynchronizedState();
       try {
         getStateMachine().doTransition(event.getType(), event);
+        subQueryState = getSynchronizedState();
       } catch (InvalidStateTransitonException e) {
         LOG.error("Can't handle this event at current state"
             + ", eventType:" + event.getType().name()
             + ", oldState:" + oldState.name()
-            + ", nextState:" + getState().name()
+            + ", nextState:" + getSynchronizedState().name()
             , e);
         eventHandler.handle(new SubQueryEvent(getId(),
             SubQueryEventType.SQ_INTERNAL_ERROR));
@@ -604,9 +616,9 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
 
       // notify the eventhandler of state change
       if (LOG.isDebugEnabled()) {
-        if (oldState != getState()) {
+        if (oldState != getSynchronizedState()) {
           LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to 
"
-              + getState());
+              + getSynchronizedState());
         }
       }
     } finally {
@@ -622,7 +634,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
       SubQueryEvent, SubQueryState> {
 
     @Override
-    public SubQueryState transition(SubQuery subQuery, SubQueryEvent 
subQueryEvent) {
+    public SubQueryState transition(final SubQuery subQuery, SubQueryEvent 
subQueryEvent) {
       subQuery.setStartTime();
       ExecutionBlock execBlock = subQuery.getBlock();
       SubQueryState state;
@@ -633,24 +645,39 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
           subQuery.finalizeStats();
           state = SubQueryState.SUCCEEDED;
         } else {
-          ExecutionBlock parent = 
subQuery.getMasterPlan().getParent(subQuery.getBlock());
-          DataChannel channel = 
subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
-          setShuffleIfNecessary(subQuery, channel);
-          initTaskScheduler(subQuery);
-          schedule(subQuery);
-          subQuery.totalScheduledObjectsCount = 
subQuery.getTaskScheduler().remainingScheduledObjectNum();
-          LOG.info(subQuery.totalScheduledObjectsCount + " objects are 
scheduled");
-
-          if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) 
{ // if there is no tasks
-            subQuery.stopScheduler();
-            subQuery.finalizeStats();
-            subQuery.eventHandler.handle(new 
SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED));
-            return SubQueryState.SUCCEEDED;
-          } else {
-            subQuery.taskScheduler.start();
-            allocateContainers(subQuery);
-            return SubQueryState.INITED;
-          }
+          // execute pre-processing asyncronously
+          subQuery.getContext().getQueryMasterContext().getEventExecutor()
+              .submit(new Runnable() {
+                        @Override
+                        public void run() {
+                          try {
+                            ExecutionBlock parent = 
subQuery.getMasterPlan().getParent(subQuery.getBlock());
+                            DataChannel channel = 
subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId());
+                            setShuffleIfNecessary(subQuery, channel);
+                            initTaskScheduler(subQuery);
+                            schedule(subQuery);
+                            subQuery.totalScheduledObjectsCount = 
subQuery.getTaskScheduler().remainingScheduledObjectNum();
+                            LOG.info(subQuery.totalScheduledObjectsCount + " 
objects are scheduled");
+
+                            if 
(subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there 
is no tasks
+                              subQuery.stopScheduler();
+                              subQuery.finalizeStats();
+                              subQuery.eventHandler.handle(new 
SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED));
+                            } else {
+                              subQuery.taskScheduler.start();
+                              allocateContainers(subQuery);
+
+                            }
+                          } catch (Exception e) {
+                            LOG.error("SubQuery (" + subQuery.getId() + ") 
ERROR: ", e);
+                            subQuery.setFinishTime();
+                            subQuery.eventHandler.handle(new 
SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage()));
+                            subQuery.eventHandler.handle(new 
SubQueryCompletedEvent(subQuery.getId(), SubQueryState.ERROR));
+                          }
+                        }
+                      }
+              );
+          state = SubQueryState.INITED;
         }
       } catch (Exception e) {
         LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e);
@@ -863,7 +890,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
         long aggregatedVolume = 0;
         for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) {
           SubQuery subquery = context.getSubQuery(childBlock.getId());
-          if (subquery == null || subquery.getState() != 
SubQueryState.SUCCEEDED) {
+          if (subquery == null || subquery.getSynchronizedState() != 
SubQueryState.SUCCEEDED) {
             aggregatedVolume += getInputVolume(masterPlan, context, 
childBlock);
           } else {
             aggregatedVolume += subquery.getResultStats().getNumBytes();
@@ -1145,7 +1172,7 @@ public class SubQuery implements 
EventHandler<SubQueryEvent> {
             subQuery.abort(SubQueryState.KILLED);
             return SubQueryState.KILLED;
           } else {
-            LOG.error("Invalid State " + subQuery.getState() + " State");
+            LOG.error("Invalid State " + subQuery.getSynchronizedState() + " 
State");
             subQuery.abort(SubQueryState.ERROR);
             return SubQueryState.ERROR;
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 79f14a2..e09a69e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -300,7 +300,7 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
           containers.add(container);
         }
 
-        SubQueryState state = 
queryTaskContext.getSubQuery(executionBlockId).getState();
+        SubQueryState state = 
queryTaskContext.getSubQuery(executionBlockId).getSynchronizedState();
         if (!SubQuery.isRunningState(state)) {
           try {
             List<ContainerId> containerIds = new ArrayList<ContainerId>();

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index abd4e98..d25013c 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -133,7 +133,7 @@ public class TajoWorkerClientService extends 
AbstractService {
             RpcController controller,
             ClientProtos.GetQueryResultRequest request) throws 
ServiceException {
       QueryId queryId = new QueryId(request.getQueryId());
-      Query query = workerContext.getQueryMaster().getQueryMasterTask(queryId, 
true).getQuery();
+      QueryMasterTask queryMasterTask = 
workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
 
       ClientProtos.GetQueryResultResponse.Builder builder = 
ClientProtos.GetQueryResultResponse.newBuilder();
       try {
@@ -142,12 +142,12 @@ public class TajoWorkerClientService extends 
AbstractService {
         LOG.warn("Can't get current user name");
       }
 
-      if(query == null) {
+      if(queryMasterTask == null || queryMasterTask.getQuery() == null) {
         builder.setErrorMessage("No Query for " + queryId);
       } else {
-        switch (query.getState()) {
+        switch (queryMasterTask.getState()) {
           case QUERY_SUCCEEDED:
-            builder.setTableDesc(query.getResultDesc().getProto());
+            
builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto());
             break;
           case QUERY_FAILED:
           case QUERY_ERROR:
@@ -196,10 +196,10 @@ public class TajoWorkerClientService extends 
AbstractService {
         Query query = queryMasterTask.getQuery();
 
         if (query != null) {
-          builder.setState(query.getState());
+          builder.setState(queryMasterTask.getState());
           builder.setProgress(query.getProgress());
           builder.setSubmitTime(query.getAppSubmitTime());
-          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+          if (queryMasterTask.getState() == 
TajoProtos.QueryState.QUERY_SUCCEEDED) {
             builder.setFinishTime(query.getFinishTime());
           } else {
             builder.setFinishTime(System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/tajo/blob/60858b25/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java 
b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 3d6ed2b..2e69666 100644
--- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -658,7 +658,7 @@ public class TestTajoClient {
 
       QueryStatus queryStatus = client.getQueryStatus(queryId);
       assertNotNull(queryStatus);
-      assertTrue(!TajoClient.isQueryRunnning(queryStatus.getState()));
+      assertTrue(TajoClient.isInCompleteState(queryStatus.getState()));
 
       TajoResultSet resultSet = (TajoResultSet) client.getQueryResult(queryId);
       assertNotNull(resultSet);

Reply via email to