Repository: tajo
Updated Branches:
  refs/heads/master aed97a8a5 -> 839081ac8


TAJO-873: Query status is still RUNNING after session expired. (Hyoungjun Kim, 
jinho)

Closes #37


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/839081ac
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/839081ac
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/839081ac

Branch: refs/heads/master
Commit: 839081ac8a752de899ddeb0bcd15866722ae6c27
Parents: aed97a8
Author: jhkim <[email protected]>
Authored: Fri Aug 22 20:53:12 2014 +0900
Committer: jhkim <[email protected]>
Committed: Fri Aug 22 20:53:12 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../tajo/master/querymaster/QueryMaster.java    |  4 +-
 .../master/querymaster/QueryMasterTask.java     | 26 +++++++----
 .../org/apache/tajo/TajoTestingCluster.java     | 31 +++++++++++++
 .../tajo/scheduler/TestFifoScheduler.java       | 47 ++++++++++----------
 5 files changed, 76 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index f0734f2..f2798b4 100644
--- a/CHANGES
+++ b/CHANGES
@@ -124,6 +124,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-873: Query status is still RUNNING after session expired.
+    (Hyoungjun Kim, jinho)
+
     TAJO-1004: UniformRangePartition cannot deal with unicode ranges.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 9f90b05..deadd39 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -579,8 +579,8 @@ public class QueryMaster extends CompositeService 
implements EventHandler {
               long lastHeartbeat = eachTask.getLastClientHeartbeat();
               long time = System.currentTimeMillis() - lastHeartbeat;
               if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) {
-                LOG.warn("Query " + eachTask.getQueryId() + " stopped cause 
query sesstion timeout: " + time + " ms");
-                eachTask.expiredSessionTimeout();
+                LOG.warn("Query " + eachTask.getQueryId() + " stopped cause 
query session timeout: " + time + " ms");
+                eachTask.expireQuerySession();
               }
             } catch (Exception e) {
               LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index ce329fb..1ffaf56 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -187,6 +187,12 @@ public class QueryMasterTask extends CompositeService {
 
     LOG.info("Stopping QueryMasterTask:" + queryId);
 
+    try {
+      resourceAllocator.stop();
+    } catch (Throwable t) {
+      LOG.fatal(t.getMessage(), t);
+    }
+
     CallFuture future = new CallFuture();
 
     RpcConnectionPool connPool = 
RpcConnectionPool.getPool(queryMasterContext.getConf());
@@ -328,14 +334,14 @@ public class QueryMasterTask extends CompositeService {
       LOG.info("Query final state: " + query.getSynchronizedState());
       queryMasterContext.stopQuery(queryId);
     }
+  }
 
-    private boolean isTerminatedState(QueryState state) {
-      return
-          state == QueryState.QUERY_SUCCEEDED ||
-          state == QueryState.QUERY_FAILED ||
-          state == QueryState.QUERY_KILLED ||
-          state == QueryState.QUERY_ERROR;
-    }
+  private static boolean isTerminatedState(QueryState state) {
+    return
+        state == QueryState.QUERY_SUCCEEDED ||
+        state == QueryState.QUERY_FAILED ||
+        state == QueryState.QUERY_KILLED ||
+        state == QueryState.QUERY_ERROR;
   }
 
   public synchronized void startQuery() {
@@ -462,8 +468,10 @@ public class QueryMasterTask extends CompositeService {
     return query;
   }
 
-  public void expiredSessionTimeout() {
-    stop();
+  protected void expireQuerySession() {
+    if(!isTerminatedState(query.getState()) && !(query.getState() == 
QueryState.QUERY_KILL_WAIT)){
+      query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+    }
   }
 
   public QueryMasterTaskContext getQueryTaskContext() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java 
b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 948f018..7b87112 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -38,6 +38,7 @@ import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
@@ -677,4 +678,34 @@ public class TajoTestingCluster {
       eachWorker.getConfig().set(key, value);
     }
   }
+
+  public void waitForQueryRunning(QueryId queryId) throws Exception {
+    QueryMasterTask qmt = null;
+
+    int i = 0;
+    while (qmt == null || TajoClient.isInPreNewState(qmt.getState())) {
+      try {
+        Thread.sleep(100);
+        if(qmt == null){
+          qmt = getQueryMasterTask(queryId);
+        }
+      } catch (InterruptedException e) {
+      }
+      if (++i > 100) {
+        throw new IOException("Timed out waiting for query to start");
+      }
+    }
+  }
+
+  public QueryMasterTask getQueryMasterTask(QueryId queryId) {
+    QueryMasterTask qmt = null;
+    for (TajoWorker worker : getTajoWorkers()) {
+      qmt = 
worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId);
+      if (qmt != null) {
+        break;
+      }
+    }
+
+    return qmt;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/839081ac/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java 
b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
index 76f22d0..069ee27 100644
--- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.scheduler;
 
-import com.google.protobuf.ServiceException;
 import org.apache.tajo.*;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.conf.TajoConf;
@@ -28,7 +27,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
 import java.sql.ResultSet;
 
 import static org.junit.Assert.*;
@@ -52,51 +50,52 @@ public class TestFifoScheduler {
   }
 
   @Test
-  public final void testKillScheduledQuery() throws IOException, 
ServiceException, InterruptedException {
-    ClientProtos.SubmitQueryResponse res = client.executeQuery("select 
sleep(2) from lineitem");
-    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select 
sleep(2) from lineitem");
-    Thread.sleep(1000);
+  public final void testKillScheduledQuery() throws Exception {
+    ClientProtos.SubmitQueryResponse res = client.executeQuery("select 
sleep(1) from lineitem");
+    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select 
sleep(1) from lineitem");
     QueryId queryId = new QueryId(res.getQueryId());
     QueryId queryId2 = new QueryId(res2.getQueryId());
-    assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, 
client.getQueryStatus(queryId2).getState());
 
+    cluster.waitForQueryRunning(queryId);
     client.killQuery(queryId2);
     assertEquals(TajoProtos.QueryState.QUERY_KILLED, 
client.getQueryStatus(queryId2).getState());
-    client.killQuery(queryId);
-    assertEquals(TajoProtos.QueryState.QUERY_KILLED, 
client.getQueryStatus(queryId).getState());
+
+    client.killQuery(queryId); // cleanup
   }
 
   @Test
-  public final void testForwardedQuery() throws IOException, ServiceException, 
InterruptedException {
-    ClientProtos.SubmitQueryResponse res = client.executeQuery("select 
sleep(2) from lineitem");
+  public final void testForwardedQuery() throws Exception {
+    ClientProtos.SubmitQueryResponse res = client.executeQuery("select 
sleep(1) from lineitem");
     ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from 
lineitem limit 1");
-
-    Thread.sleep(1000);
+    assertTrue(res.getIsForwarded());
     assertFalse(res2.getIsForwarded());
+
+    QueryId queryId = new QueryId(res.getQueryId());
     QueryId queryId2 = new QueryId(res2.getQueryId());
+    cluster.waitForQueryRunning(queryId);
+
     assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, 
client.getQueryStatus(queryId2).getState());
     ResultSet resSet = TajoClient.createResultSet(client, res2);
     assertNotNull(resSet);
 
-    QueryId queryId = new QueryId(res.getQueryId());
-    assertEquals(TajoProtos.QueryState.QUERY_RUNNING, 
client.getQueryStatus(queryId).getState());
-    client.killQuery(queryId);
+    client.killQuery(queryId); //cleanup
   }
 
   @Test
-  public final void testScheduledQuery() throws IOException, ServiceException, 
InterruptedException {
-    ClientProtos.SubmitQueryResponse res = client.executeQuery("select 
sleep(2) from lineitem");
-    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select 
sleep(2) from lineitem");
-    ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select 
sleep(2) from lineitem");
-    ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select 
sleep(2) from lineitem");
-
-    Thread.sleep(1000);
+  public final void testScheduledQuery() throws Exception {
+    ClientProtos.SubmitQueryResponse res = client.executeQuery("select 
sleep(1) from lineitem");
+    ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select 
sleep(1) from lineitem");
+    ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select 
sleep(1) from lineitem");
+    ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select 
sleep(1) from lineitem");
 
     QueryId queryId = new QueryId(res.getQueryId());
     QueryId queryId2 = new QueryId(res2.getQueryId());
     QueryId queryId3 = new QueryId(res3.getQueryId());
     QueryId queryId4 = new QueryId(res4.getQueryId());
-    assertEquals(TajoProtos.QueryState.QUERY_RUNNING, 
client.getQueryStatus(queryId).getState());
+
+    cluster.waitForQueryRunning(queryId);
+
+    
assertTrue(TajoClient.isInRunningState(client.getQueryStatus(queryId).getState()));
 
     assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, 
client.getQueryStatus(queryId2).getState());
     assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, 
client.getQueryStatus(queryId3).getState());

Reply via email to