TAJO-1509 Use dedicated thread to release resource allocated to container

Closes #501


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d57b16fe
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d57b16fe
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d57b16fe

Branch: refs/heads/index_support
Commit: d57b16fe68986d42a143da87162d89a672aec792
Parents: daf55ea
Author: navis.ryu <[email protected]>
Authored: Mon Apr 13 17:52:51 2015 +0900
Committer: Hyoungjun Kim <[email protected]>
Committed: Mon Apr 13 17:52:51 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../planner/physical/ExternalSortExec.java      |  2 +-
 .../org/apache/tajo/master/ContainerProxy.java  | 12 ++-
 .../apache/tajo/master/TajoContainerProxy.java  | 40 +++------
 .../tajo/worker/TajoResourceAllocator.java      | 93 ++++++++++++++------
 .../java/org/apache/tajo/rpc/NullCallback.java  |  9 +-
 6 files changed, 99 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/d57b16fe/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1ae67a5..d7977bb 100644
--- a/CHANGES
+++ b/CHANGES
@@ -13,6 +13,9 @@ Release 0.11.0 - unreleased
     (jihun)
 
   IMPROVEMENT
+  
+    TAJO-1509: Use dedicated thread to release resource allocated to container.
+    (Contributed by navis, Committed by hyoungjun)
 
     TAJO-1454: Comparing two date or two timestamp need not normalizing
     (Contributed by navis, Committed by hyoungjun)

http://git-wip-us.apache.org/repos/asf/tajo/blob/d57b16fe/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 355f015..3da296c 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -184,7 +184,7 @@ public class ExternalSortExec extends SortExec {
 
     info(LOG, "Chunk #" + chunkId + " sort and written (" +
         FileUtil.humanReadableByteCount(appender.getOffset(), false) + " 
bytes, " + rowNum + " rows, " +
-        ", sort time: " + (sortEnd - sortStart) + " msec, " +
+        "sort time: " + (sortEnd - sortStart) + " msec, " +
         "write time: " + (chunkWriteEnd - chunkWriteStart) + " msec)");
     return outputPath;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d57b16fe/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java 
b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
index 562790d..cad63a0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/ContainerProxy.java
@@ -46,7 +46,7 @@ public abstract class ContainerProxy {
   protected ContainerState state;
   // store enough information to be able to cleanup the container
   protected TajoContainer container;
-  protected TajoContainerId containerID;
+  protected TajoContainerId containerId;
   protected String hostName;
   protected int port = -1;
 
@@ -60,7 +60,7 @@ public abstract class ContainerProxy {
     this.state = ContainerState.PREP;
     this.container = container;
     this.executionBlockId = executionBlockId;
-    this.containerID = container.getId();
+    this.containerId = container.getId();
   }
 
   public synchronized boolean isCompletelyDone() {
@@ -75,7 +75,11 @@ public abstract class ContainerProxy {
     return this.port;
   }
 
-  public String getId() {
-    return executionBlockId.toString();
+  public TajoContainerId getContainerId() {
+    return containerId;
+  }
+
+  public ExecutionBlockId getBlockId() {
+    return executionBlockId;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d57b16fe/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java 
b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 7ed9fc5..139359c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -23,9 +23,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -43,6 +41,7 @@ import org.apache.tajo.worker.TajoWorker;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 public class TajoContainerProxy extends ContainerProxy {
@@ -61,14 +60,14 @@ public class TajoContainerProxy extends ContainerProxy {
 
   @Override
   public synchronized void launch(ContainerLaunchContext 
containerLaunchContext) {
-    context.getResourceAllocator().addContainer(containerID, this);
+    context.getResourceAllocator().addContainer(containerId, this);
 
     this.hostName = container.getNodeId().getHost();
     this.port = 
((TajoWorkerContainer)container).getWorkerResource().getConnectionInfo().getPullServerPort();
     this.state = ContainerState.RUNNING;
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Launch Container:" + executionBlockId + "," + 
containerID.getId() + "," +
+      LOG.debug("Launch Container:" + executionBlockId + "," + 
containerId.getId() + "," +
           container.getId() + "," + container.getNodeId() + ", pullServer=" + 
port);
     }
 
@@ -127,43 +126,32 @@ public class TajoContainerProxy extends ContainerProxy {
   @Override
   public synchronized void stopContainer() {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + 
containerID + ", state:" + this.state);
+      LOG.debug("Release TajoWorker Resource: " + executionBlockId + "," + 
containerId + ", state:" + this.state);
     }
     if(isCompletelyDone()) {
-      LOG.info("Container already stopped:" + containerID);
+      LOG.info("Container already stopped:" + containerId);
       return;
     }
     if(this.state == ContainerState.PREP) {
       this.state = ContainerState.KILLED_BEFORE_LAUNCH;
     } else {
       try {
-        TajoWorkerContainer tajoWorkerContainer = 
((TajoWorkerContainer)container);
-        releaseWorkerResource(context, executionBlockId, 
tajoWorkerContainer.getId());
-        context.getResourceAllocator().removeContainer(containerID);
-        this.state = ContainerState.DONE;
+        releaseWorkerResource(context, executionBlockId, 
Arrays.asList(containerId));
+        context.getResourceAllocator().removeContainer(containerId);
       } catch (Throwable t) {
         // ignore the cleanup failure
         String message = "cleanup failed for container "
-            + this.containerID + " : "
+            + this.containerId + " : "
             + StringUtils.stringifyException(t);
         LOG.warn(message);
+      } finally {
         this.state = ContainerState.DONE;
-        return;
       }
     }
   }
 
   public static void 
releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
                                            ExecutionBlockId executionBlockId,
-                                           TajoContainerId containerId) throws 
Exception {
-    List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>();
-    containerIds.add(containerId);
-
-    releaseWorkerResource(context, executionBlockId, containerIds);
-  }
-
-  public static void 
releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext context,
-                                           ExecutionBlockId executionBlockId,
                                            List<TajoContainerId> containerIds) 
throws Exception {
     List<ContainerProtocol.TajoContainerIdProto> containerIdProtos =
         new ArrayList<ContainerProtocol.TajoContainerIdProto>();
@@ -181,12 +169,10 @@ public class TajoContainerProxy extends ContainerProxy {
       QueryCoordinatorProtocol.QueryCoordinatorProtocolService 
masterClientService = tmClient.getStub();
         masterClientService.releaseWorkerResource(null,
             QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder()
-              .setExecutionBlockId(executionBlockId.getProto())
-              .addAllContainerIds(containerIdProtos)
-              .build(),
-          NullCallback.get());
-    } catch (Throwable e) {
-      LOG.error(e.getMessage(), e);
+                .setExecutionBlockId(executionBlockId.getProto())
+                .addAllContainerIds(containerIdProtos)
+                .build(),
+            NullCallback.get());
     } finally {
       connPool.releaseConnection(tmClient);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d57b16fe/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 47a9fda..6798c33 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.worker;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,7 +31,6 @@ import 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
@@ -51,13 +53,16 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.net.InetSocketAddress;
 import java.util.*;
+import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -67,14 +72,16 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
 
   private TajoConf tajoConf;
   private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
-  private final ExecutorService executorService;
+  private final ExecutorService allocationExecutor;
+  private final Deallocator deallocator;
 
   private AtomicBoolean stopped = new AtomicBoolean(false);
 
   public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext 
queryTaskContext) {
     this.queryTaskContext = queryTaskContext;
-    executorService = Executors.newFixedThreadPool(
+    allocationExecutor = Executors.newFixedThreadPool(
       
queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+    deallocator = new Deallocator();
   }
 
   @Override
@@ -115,16 +122,19 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
 
     
queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, 
new TajoWorkerAllocationHandler());
 
+    deallocator.start();
+
     super.init(conf);
   }
 
   @Override
   public synchronized void stop() {
-    if (stopped.getAndSet(true)) {
+    if (stopped.compareAndSet(false, true)) {
       return;
     }
 
-    executorService.shutdownNow();
+    allocationExecutor.shutdownNow();
+    deallocator.shutdown();
 
     Map<TajoContainerId, ContainerProxy> containers = 
queryTaskContext.getResourceAllocator()
       .getContainers();
@@ -168,7 +178,7 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
     for(TajoContainer eachContainer: event.getContainers()) {
       TajoContainerProxy containerProxy = new 
TajoContainerProxy(queryTaskContext, tajoConf,
         eachContainer, event.getQueryContext(), event.getExecutionBlockId(), 
event.getPlanJson());
-      executorService.submit(new LaunchRunner(eachContainer.getId(), 
containerProxy));
+      allocationExecutor.submit(new LaunchRunner(eachContainer.getId(), 
containerProxy));
     }
   }
 
@@ -180,7 +190,7 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
     }
 
     for (final NodeId worker : workers) {
-      executorService.submit(new Runnable() {
+      allocationExecutor.submit(new Runnable() {
         @Override
         public void run() {
           stopExecutionBlock(executionBlockId, worker);
@@ -196,7 +206,8 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
       tajoWorkerRpc = RpcConnectionPool.getPool().getConnection(addr, 
TajoWorkerProtocol.class, true);
       TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = 
tajoWorkerRpc.getStub();
 
-      tajoWorkerRpcClient.stopExecutionBlock(null, 
executionBlockId.getProto(), NullCallback.get());
+      tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(),
+          NullCallback.get(PrimitiveProtos.BoolProto.class));
     } catch (Throwable e) {
       LOG.error(e.getMessage(), e);
     } finally {
@@ -221,33 +232,62 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
   }
 
   private void stopContainers(Collection<TajoContainer> containers) {
-    for (TajoContainer container : containers) {
-      final ContainerProxy proxy = 
queryTaskContext.getResourceAllocator().getContainer(container.getId());
-      executorService.submit(new StopContainerRunner(container.getId(), 
proxy));
-    }
+    deallocator.submit(Iterables.transform(containers, new 
Function<TajoContainer, TajoContainerId>() {
+      public TajoContainerId apply(TajoContainer input) { return 
input.getId(); }
+    }));
   }
 
-  private static class StopContainerRunner implements Runnable {
-    private final ContainerProxy proxy;
-    private final TajoContainerId id;
-    public StopContainerRunner(TajoContainerId id, ContainerProxy proxy) {
-      this.id = id;
-      this.proxy = proxy;
+  private static final TajoContainerId FIN = new TajoWorkerContainerId();
+
+  private class Deallocator extends Thread {
+
+    private final BlockingDeque<TajoContainerId> queue = new 
LinkedBlockingDeque<TajoContainerId>();
+
+    public Deallocator() {
+      setName("Deallocator");
+      setDaemon(true);
+    }
+
+    private void submit(Iterable<TajoContainerId> container) {
+      queue.addAll(Lists.newArrayList(container));
+    }
+
+    private void shutdown() {
+      queue.add(FIN);
     }
 
     @Override
     public void run() {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("ContainerProxy stopped:" + id + "," + proxy.getId());
+      final AbstractResourceAllocator allocator = 
queryTaskContext.getResourceAllocator();
+      while (!stopped.get() || !queue.isEmpty()) {
+        TajoContainerId containerId;
+        try {
+          containerId = queue.take();
+        } catch (InterruptedException e) {
+          continue;
+        }
+        if (containerId == FIN) {
+          break;
+        }
+        ContainerProxy proxy = allocator.getContainer(containerId);
+        if (proxy == null) {
+          continue;
+        }
+        try {
+          LOG.info("Stopping ContainerProxy: " + proxy.getContainerId() + "," 
+ proxy.getBlockId());
+          proxy.stopContainer();
+        } catch (Exception e) {
+          LOG.warn("Failed to stop container " + proxy.getContainerId() + "," 
+ proxy.getBlockId(), e);
+        }
       }
-      proxy.stopContainer();
+      LOG.info("Deallocator exiting");
     }
   }
 
   class TajoWorkerAllocationHandler implements 
EventHandler<ContainerAllocationEvent> {
     @Override
     public void handle(ContainerAllocationEvent event) {
-      executorService.submit(new TajoWorkerAllocationThread(event));
+      allocationExecutor.submit(new TajoWorkerAllocationThread(event));
     }
   }
 
@@ -341,13 +381,14 @@ public class TajoResourceAllocator extends 
AbstractResourceAllocator {
 
         StageState state = 
queryTaskContext.getStage(executionBlockId).getSynchronizedState();
         if (!Stage.isRunningState(state)) {
+          List<TajoContainerId> containerIds = new 
ArrayList<TajoContainerId>();
+          for(TajoContainer eachContainer: containers) {
+            containerIds.add(eachContainer.getId());
+          }
           try {
-            List<TajoContainerId> containerIds = new 
ArrayList<TajoContainerId>();
-            for(TajoContainer eachContainer: containers) {
-              containerIds.add(eachContainer.getId());
-            }
             TajoContainerProxy.releaseWorkerResource(queryTaskContext, 
executionBlockId, containerIds);
           } catch (Throwable e) {
+            deallocator.submit(containerIds);
             LOG.error(e.getMessage(), e);
           }
           return;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d57b16fe/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
index 9b7f8ac..896a02e 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NullCallback.java
@@ -20,7 +20,7 @@ package org.apache.tajo.rpc;
 
 import com.google.protobuf.RpcCallback;
 
-public class NullCallback implements RpcCallback<Object> {
+public class NullCallback<T> implements RpcCallback<T> {
   private final static NullCallback instance;
 
   static {
@@ -31,8 +31,13 @@ public class NullCallback implements RpcCallback<Object> {
     return instance;
   }
 
+  @SuppressWarnings("unchecked")
+  public static <T> RpcCallback<T> get(Class<T> clazz) {
+    return (RpcCallback<T>)instance;
+  }
+
   @Override
-  public void run(Object parameter) {
+  public void run(T parameter) {
     // do nothing
   }
 }

Reply via email to