Repository: hive
Updated Branches:
  refs/heads/llap 6547c0cec -> cae3ec16b


http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
index ca04557..515bf3c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapNodeId.java
@@ -81,9 +81,6 @@ public class LlapNodeId {
 
   @Override
   public String toString() {
-    return "LlapNodeId{" +
-        "hostname='" + hostname + '\'' +
-        ", port=" + port +
-        '}';
+    return hostname + ":" + port;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
index 82f3b59..f3ce33b 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/ContainerRunner.java
@@ -16,12 +16,19 @@ package org.apache.hadoop.hive.llap.daemon;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 
 public interface ContainerRunner {
 
   void submitWork(SubmitWorkRequestProto request) throws IOException;
 
   void sourceStateUpdated(SourceStateUpdatedRequestProto request);
+
+  void queryComplete(QueryCompleteRequestProto request);
+
+  void terminateFragment(TerminateFragmentRequestProto request);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 061e875..c9e5829 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -14,7 +14,6 @@
 
 package org.apache.hadoop.hive.llap.daemon.impl;
 
-import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
@@ -26,19 +25,18 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -53,6 +51,7 @@ import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
 
 import com.google.common.base.Preconditions;
@@ -64,11 +63,11 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
   private static final Logger LOG = 
Logger.getLogger(ContainerRunnerImpl.class);
 
   private volatile AMReporter amReporter;
+  private final QueryTracker queryTracker;
   private final Scheduler<TaskRunnerCallable> executorService;
   private final AtomicReference<InetSocketAddress> localAddress;
   private final String[] localDirsBase;
   private final Map<String, String> localEnv = new HashMap<>();
-  private final FileSystem localFs;
   private final long memoryPerExecutor;
   private final LlapDaemonExecutorMetrics metrics;
   private final Configuration conf;
@@ -89,6 +88,7 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
     this.localDirsBase = localDirsBase;
     this.localAddress = localAddress;
 
+    this.queryTracker = new QueryTracker(conf, localDirsBase);
     this.executorService = new TaskExecutorService(numExecutors, 
waitQueueSize, enablePreemption);
     AuxiliaryServiceHelper.setServiceDataIntoEnv(
         TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID,
@@ -99,11 +99,6 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
     this.memoryPerExecutor = (long)(totalMemoryAvailableBytes * 0.8 / (float) 
numExecutors);
     this.metrics = metrics;
 
-    try {
-      localFs = FileSystem.getLocal(conf);
-    } catch (IOException e) {
-      throw new RuntimeException("Failed to setup local filesystem instance", 
e);
-    }
     confParams = new TaskRunnerCallable.ConfParams(
         conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
             TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT),
@@ -135,19 +130,10 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
       amReporter.stop();
       amReporter = null;
     }
+    queryTracker.shutdown();
     super.serviceStop();
   }
 
-  // TODO Move this into a utilities class
-  private static String createAppSpecificLocalDir(String baseDir, String 
applicationIdString,
-                                                  String user) {
-    // TODO This is broken for secure clusters. The app will not have 
permission to create these directories.
-    // May work via Slider - since the directory would already exist. 
Otherwise may need a custom shuffle handler.
-    // TODO This should be the process user - and not the user on behalf of 
whom the query is being submitted.
-    return baseDir + File.separator + "usercache" + File.separator + user + 
File.separator +
-        "appcache" + File.separator + applicationIdString;
-  }
-
   @Override
   public void submitWork(SubmitWorkRequestProto request) throws IOException {
     HistoryLogger.logFragmentStart(request.getApplicationIdString(), 
request.getContainerIdString(),
@@ -170,15 +156,20 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
       env.putAll(localEnv);
       env.put(ApplicationConstants.Environment.USER.name(), request.getUser());
 
-      String[] localDirs = new String[localDirsBase.length];
+      FragmentSpecProto fragmentSpec = request.getFragmentSpec();
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(
+          fragmentSpec.getTaskAttemptIdString());
+      int dagIdentifier = 
taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
+
+      queryTracker
+          .registerFragment(null, request.getApplicationIdString(), 
fragmentSpec.getDagName(),
+              dagIdentifier,
+              fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(),
+              fragmentSpec.getAttemptNumber(), request.getUser());
+
+      String []localDirs = queryTracker.getLocalDirs(null, 
fragmentSpec.getDagName(), request.getUser());
+      Preconditions.checkNotNull(localDirs);
 
-      // Setup up local dirs to be application specific, and create them.
-      for (int i = 0; i < localDirsBase.length; i++) {
-        localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], 
request.getApplicationIdString(),
-            request.getUser());
-        localFs.mkdirs(new Path(localDirs[i]));
-      }
-      // TODO Avoid this directory creation on each work-unit submission.
       if (LOG.isDebugEnabled()) {
         LOG.debug("Dirs are: " + Arrays.toString(localDirs));
       }
@@ -195,7 +186,9 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
 
       // TODO Unregistering does not happen at the moment, since there's no 
signals on when an app completes.
       LOG.info("DEBUG: Registering request with the ShuffleHandler");
-      
ShuffleHandler.get().registerApplication(request.getApplicationIdString(), 
jobToken, request.getUser(), localDirs);
+      ShuffleHandler.get()
+          .registerDag(request.getApplicationIdString(), dagIdentifier, 
jobToken,
+              request.getUser(), localDirs);
 
       ConcurrentMap<String, SourceStateProto> sourceCompletionMap = 
getSourceCompletionMap(request.getFragmentSpec().getDagName());
       TaskRunnerCallable callable = new TaskRunnerCallable(request, new 
Configuration(getConfig()),
@@ -209,10 +202,6 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
     }
   }
 
-  private void notifyAMOfRejection(TaskRunnerCallable callable) {
-    LOG.error("Notifying AM of request rejection is not implemented yet!");
-  }
-
   @Override
   public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
     LOG.info("Processing state update: " + 
stringifySourceStateUpdateRequest(request));
@@ -220,6 +209,21 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
     dagMap.put(request.getSrcName(), request.getState());
   }
 
+  @Override
+  public void queryComplete(QueryCompleteRequestProto request) {
+    queryTracker.queryComplete(null, request.getDagName(), 
request.getDeleteDelay());
+  }
+
+  @Override
+  public void terminateFragment(TerminateFragmentRequestProto request) {
+    // TODO Implement when this gets used.
+  }
+
+
+  private void notifyAMOfRejection(TaskRunnerCallable callable) {
+    LOG.error("Notifying AM of request rejection is not implemented yet!");
+  }
+
   private String 
stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {
     StringBuilder sb = new StringBuilder();
     sb.append("dagName=").append(request.getDagName())

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 86b1f5c..fabacf7 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -27,7 +27,10 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
 import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics;
 import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
@@ -252,17 +255,27 @@ public class LlapDaemon extends AbstractService 
implements ContainerRunner, Llap
   }
 
   @Override
-  public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto 
request) throws
+  public void submitWork(SubmitWorkRequestProto request) throws
       IOException {
     numSubmissions.incrementAndGet();
     containerRunner.submitWork(request);
   }
 
   @Override
-  public void 
sourceStateUpdated(LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto 
request) {
+  public void sourceStateUpdated(SourceStateUpdatedRequestProto request) {
     containerRunner.sourceStateUpdated(request);
   }
 
+  @Override
+  public void queryComplete(QueryCompleteRequestProto request) {
+    containerRunner.queryComplete(request);
+  }
+
+  @Override
+  public void terminateFragment(TerminateFragmentRequestProto request) {
+    containerRunner.terminateFragment(request);
+  }
+
   @VisibleForTesting
   public long getNumSubmissions() {
     return numSubmissions.get();

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
index 01b53c2..9f161fe 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolClientImpl.java
@@ -20,10 +20,15 @@ import java.net.InetSocketAddress;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -66,6 +71,28 @@ public class LlapDaemonProtocolClientImpl implements 
LlapDaemonProtocolBlockingP
     }
   }
 
+  @Override
+  public QueryCompleteResponseProto queryComplete(RpcController controller,
+                                                  QueryCompleteRequestProto 
request) throws
+      ServiceException {
+    try {
+      return getProxy().queryComplete(null, request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public TerminateFragmentResponseProto terminateFragment(
+      RpcController controller,
+      TerminateFragmentRequestProto request) throws ServiceException {
+    try {
+      return getProxy().terminateFragment(null, request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
   public LlapDaemonProtocolBlockingPB getProxy() throws IOException {
     if (proxy == null) {
       proxy = createProxy();

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
index 0360a27..8cb9715 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemonProtocolServerImpl.java
@@ -27,10 +27,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
@@ -84,6 +88,22 @@ public class LlapDaemonProtocolServerImpl extends 
AbstractService
   }
 
   @Override
+  public QueryCompleteResponseProto queryComplete(RpcController controller,
+                                                  QueryCompleteRequestProto 
request) throws
+      ServiceException {
+    containerRunner.queryComplete(request);
+    return QueryCompleteResponseProto.getDefaultInstance();
+  }
+
+  @Override
+  public TerminateFragmentResponseProto terminateFragment(
+      RpcController controller,
+      TerminateFragmentRequestProto request) throws ServiceException {
+    containerRunner.terminateFragment(request);
+    return TerminateFragmentResponseProto.getDefaultInstance();
+  }
+
+  @Override
   public void serviceStart() {
     Configuration conf = getConfig();
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
new file mode 100644
index 0000000..bc18a77
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFileCleaner.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.common.CallableWithNdc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryFileCleaner extends AbstractService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(QueryFileCleaner.class);
+
+  private final ListeningScheduledExecutorService executorService;
+  private final FileSystem localFs;
+
+
+  public QueryFileCleaner(Configuration conf, FileSystem localFs) {
+    super(QueryFileCleaner.class.getName());
+    int numCleanerThreads = 
conf.getInt(LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS,
+        LlapConfiguration.LLAP_DAEMON_NUM_FILE_CLEANER_THREADS_DEFAULT);
+    ScheduledExecutorService rawExecutor = 
Executors.newScheduledThreadPool(numCleanerThreads,
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("QueryFileCleaner 
%d").build());
+    this.executorService = MoreExecutors.listeningDecorator(rawExecutor);
+    this.localFs = localFs;
+  }
+
+  public void serviceStart() {
+    LOG.info(getName() + " started");
+  }
+
+  @Override
+  public void serviceStop() {
+    executorService.shutdownNow();
+    LOG.info(getName() + " stopped");
+  }
+
+  public void cleanupDir(String dir, long deleteDelay) {
+    LOG.info("Scheduling deletion of {} after {} seconds", dir, deleteDelay);
+    executorService.schedule(new FileCleanerCallable(dir), deleteDelay, 
TimeUnit.SECONDS);
+  }
+
+  private class FileCleanerCallable extends CallableWithNdc<Void> {
+
+    private final String dirToDelete;
+
+    private FileCleanerCallable(String dirToDelete) {
+      this.dirToDelete = dirToDelete;
+    }
+
+    @Override
+    protected Void callInternal() {
+      Path pathToDelete = new Path(dirToDelete);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleting path: " + pathToDelete);
+      }
+      try {
+        localFs.delete(new Path(dirToDelete), true);
+      } catch (IOException e) {
+        LOG.warn("Ignoring exception while cleaning up path: " + pathToDelete, 
e);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
new file mode 100644
index 0000000..16d745b
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Tracks queries running within a daemon
+ */
+public class QueryTracker {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(QueryTracker.class);
+  private final QueryFileCleaner queryFileCleaner;
+
+  // TODO Make use if the query id for cachin when this is available.
+  private final ConcurrentHashMap<String, QueryInfo> queryInfoMap = new 
ConcurrentHashMap<>();
+
+  private final String[] localDirsBase;
+  private final FileSystem localFs;
+
+  public QueryTracker(Configuration conf, String[] localDirsBase) {
+    this.localDirsBase = localDirsBase;
+    try {
+      localFs = FileSystem.getLocal(conf);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to setup local filesystem instance", 
e);
+    }
+    queryFileCleaner = new QueryFileCleaner(conf, localFs);
+    queryFileCleaner.init(conf);
+    queryFileCleaner.start();
+  }
+
+
+  void registerFragment(String queryId, String appIdString, String dagName, 
int dagIdentifier,
+                        String vertexName, int fragmentNumber, int 
attemptNumber,
+                        String user) throws
+      IOException {
+    QueryInfo queryInfo = queryInfoMap.get(dagName);
+    if (queryInfo == null) {
+      queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, 
user);
+      queryInfoMap.putIfAbsent(dagName, queryInfo);
+    }
+  }
+
+  String[] getLocalDirs(String queryId, String dagName, String user) throws 
IOException {
+    QueryInfo queryInfo = queryInfoMap.get(dagName);
+    return queryInfo.getLocalDirs();
+  }
+
+  void queryComplete(String queryId, String dagName, long deleteDelay) {
+    LOG.info("Processing queryComplete for dagName={} with deleteDelay={} 
seconds", dagName, deleteDelay);
+    QueryInfo queryInfo = queryInfoMap.remove(dagName);
+    if (queryInfo == null) {
+      LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
+    }
+    String []localDirs = queryInfo.getLocalDirsNoCreate();
+    if (localDirs != null) {
+      for (String localDir : localDirs) {
+        queryFileCleaner.cleanupDir(localDir, deleteDelay);
+        ShuffleHandler.get().unregisterDag(localDir, dagName, 
queryInfo.dagIdentifier);
+      }
+    }
+    // TODO HIVE-10535 Cleanup map join cache
+  }
+
+  void shutdown() {
+    queryFileCleaner.stop();
+  }
+
+
+  private class QueryInfo {
+
+    private final String queryId;
+    private final String appIdString;
+    private final String dagName;
+    private final int dagIdentifier;
+    private final String user;
+    private String[] localDirs;
+
+    public QueryInfo(String queryId, String appIdString, String dagName, int 
dagIdentifier,
+                     String user) {
+      this.queryId = queryId;
+      this.appIdString = appIdString;
+      this.dagName = dagName;
+      this.dagIdentifier = dagIdentifier;
+      this.user = user;
+    }
+
+
+
+
+    private synchronized void createLocalDirs() throws IOException {
+      if (localDirs == null) {
+        localDirs = new String[localDirsBase.length];
+        for (int i = 0; i < localDirsBase.length; i++) {
+          localDirs[i] = createAppSpecificLocalDir(localDirsBase[i], 
appIdString, user, dagIdentifier);
+          localFs.mkdirs(new Path(localDirs[i]));
+        }
+      }
+    }
+
+    private synchronized String[] getLocalDirs() throws IOException {
+      if (localDirs == null) {
+        createLocalDirs();
+      }
+      return localDirs;
+    }
+
+    private synchronized String[] getLocalDirsNoCreate() {
+      return this.localDirs;
+    }
+  }
+
+  private static String createAppSpecificLocalDir(String baseDir, String 
applicationIdString,
+                                                  String user, int 
dagIdentifier) {
+    // TODO This is broken for secure clusters. The app will not have 
permission to create these directories.
+    // May work via Slider - since the directory would already exist. 
Otherwise may need a custom shuffle handler.
+    // TODO This should be the process user - and not the user on behalf of 
whom the query is being submitted.
+    return baseDir + File.separator + "usercache" + File.separator + user + 
File.separator +
+        "appcache" + File.separator + applicationIdString + File.separator + 
dagIdentifier;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java
index 08e4787..b1d2cf7 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/DirWatcher.java
@@ -50,7 +50,7 @@ class DirWatcher {
   private static final Log LOG = LogFactory.getLog(DirWatcher.class);
 
   private static enum Type {
-    BASE, // App Base Dir
+    BASE, // App Base Dir / ${dagDir}
     OUTPUT, // appBase/output/
     FINAL, // appBase/output/attemptDir
   }
@@ -95,9 +95,12 @@ class DirWatcher {
    * @param expiry when to expire the watch - in ms
    * @throws IOException
    */
-  void registerApplicationDir(String pathString, String appId, String user, 
long expiry) throws IOException {
+  void registerDagDir(String pathString, String appId, int dagIdentifier, 
String user, long expiry) throws IOException {
+    // The path string contains the dag Identifier
     Path path = FileSystems.getDefault().getPath(pathString);
-    WatchedPathInfo watchedPathInfo = new 
WatchedPathInfo(System.currentTimeMillis() + expiry, Type.BASE, appId, user);
+    WatchedPathInfo watchedPathInfo =
+        new WatchedPathInfo(System.currentTimeMillis() + expiry, Type.BASE, 
appId, dagIdentifier,
+            user);
     watchedPaths.put(path, watchedPathInfo);
     WatchKey watchKey = path.register(watchService, ENTRY_CREATE);
     watchedPathInfo.setWatchKey(watchKey);
@@ -106,6 +109,10 @@ class DirWatcher {
     // TODO Watches on the output dirs need to be cancelled at some point. For 
now - via the expiry.
   }
 
+  void unregisterDagDir(String pathString, String appId, int dagIdentifier) {
+    // TODO Implement to remove all watches for the specified pathString and 
it's sub-tree
+  }
+
   /**
    * Invoke when a pathIdentifier has been found, or is no longer of interest
    * @param pathIdentifier
@@ -226,7 +233,7 @@ class DirWatcher {
               cancelledWatch = true;
               watchKey.cancel();
             } else {
-              LOG.warn("DEBUG: Found unexpected directory: " + event.context() 
+ " under " + watchedPath);
+              LOG.warn("DEBUG: Found unexpected directory while looking for 
OUTPUT: " + event.context() + " under " + watchedPath);
             }
             break;
           case OUTPUT:
@@ -349,15 +356,17 @@ class DirWatcher {
     final long expiry;
     final Type type;
     final String appId;
+    final int dagId;
     final String user;
     final String attemptId;
     final AttemptPathIdentifier pathIdentifier;
     WatchKey watchKey;
 
-    public WatchedPathInfo(long expiry, Type type, String jobId, String user) {
+    public WatchedPathInfo(long expiry, Type type, String jobId, int dagId, 
String user) {
       this.expiry = expiry;
       this.type = type;
       this.appId = jobId;
+      this.dagId = dagId;
       this.user = user;
       this.attemptId = null;
       this.pathIdentifier = null;
@@ -367,10 +376,11 @@ class DirWatcher {
       this.expiry = other.expiry;
       this.appId = other.appId;
       this.user = other.user;
+      this.dagId = other.dagId;
       this.type = type;
       this.attemptId = attemptId;
       if (attemptId != null) {
-        pathIdentifier = new AttemptPathIdentifier(appId, user, attemptId);
+        pathIdentifier = new AttemptPathIdentifier(appId, dagId, user, 
attemptId);
       } else {
         pathIdentifier = null;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
index d640b36..2572c75 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java
@@ -408,11 +408,13 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
   /**
    * Register an application and it's associated credentials and user 
information.
    * @param applicationIdString
+   * @param dagIdentifier
    * @param appToken
    * @param user
    */
-  public void registerApplication(String applicationIdString, 
Token<JobTokenIdentifier> appToken,
-                                  String user, String [] appDirs) {
+  public void registerDag(String applicationIdString, int dagIdentifier,
+                          Token<JobTokenIdentifier> appToken,
+                          String user, String[] appDirs) {
     // TODO Fix this. There's a race here, where an app may think everything 
is registered, finish really fast, send events and the consumer will not find 
the registration.
     Boolean registered = registeredApps.putIfAbsent(applicationIdString, 
Boolean.valueOf(true));
     if (registered == null) {
@@ -421,7 +423,8 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
       if (dirWatcher != null) {
         for (String appDir : appDirs) {
           try {
-            dirWatcher.registerApplicationDir(appDir, applicationIdString, 
user, 5 * 60 * 1000);
+            dirWatcher.registerDagDir(appDir, applicationIdString, 
dagIdentifier, user,
+                5 * 60 * 1000);
           } catch (IOException e) {
             LOG.warn("Unable to register dir: " + appDir + " with watcher");
           }
@@ -430,9 +433,13 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     }
   }
 
+  public void unregisterDag(String dir, String applicationIdString, int 
dagIdentifier) {
+    dirWatcher.unregisterDagDir(dir, applicationIdString, dagIdentifier);
+    // TODO Cleanup registered tokens and dag info
+  }
+
   public void unregisterApplication(String applicationIdString) {
     removeJobShuffleInfo(applicationIdString);
-    // TOOD Unregister from the dirWatcher
   }
 
 
@@ -546,7 +553,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
           @Override
           public AttemptPathInfo load(AttemptPathIdentifier key) throws
               Exception {
-            String base = getBaseLocation(key.jobId, key.user);
+            String base = getBaseLocation(key.jobId, key.dagId, key.user);
             String attemptBase = base + key.attemptId;
             Path indexFileName =
                 lDirAlloc.getLocalPathToRead(attemptBase + "/" + 
INDEX_FILE_NAME, conf);
@@ -633,27 +640,31 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
       final List<String> mapIds = splitMaps(q.get("map"));
       final List<String> reduceQ = q.get("reduce");
       final List<String> jobQ = q.get("job");
+      final List<String> dagIdQ = q.get("dag");
       if (LOG.isDebugEnabled()) {
         LOG.debug("RECV: " + request.getUri() +
             "\n  mapId: " + mapIds +
             "\n  reduceId: " + reduceQ +
             "\n  jobId: " + jobQ +
+            "\n  dagId: " + dagIdQ +
             "\n  keepAlive: " + keepAliveParam);
       }
 
-      if (mapIds == null || reduceQ == null || jobQ == null) {
+      if (mapIds == null || reduceQ == null || jobQ == null | dagIdQ == null) {
         sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
         return;
       }
-      if (reduceQ.size() != 1 || jobQ.size() != 1) {
+      if (reduceQ.size() != 1 || jobQ.size() != 1 || dagIdQ.size() != 1) {
         sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
         return;
       }
       int reduceId;
       String jobId;
+      int dagId;
       try {
         reduceId = Integer.parseInt(reduceQ.get(0));
         jobId = jobQ.get(0);
+        dagId = Integer.parseInt(dagIdQ.get(0));
       } catch (NumberFormatException e) {
         sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
         return;
@@ -683,7 +694,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
       String user = userRsrc.get(jobId);
 
       try {
-        populateHeaders(mapIds, jobId, user, reduceId,
+        populateHeaders(mapIds, jobId, dagId, user, reduceId,
             response, keepAliveParam, mapOutputInfoMap);
       } catch(IOException e) {
         ch.write(response);
@@ -701,7 +712,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
           // This will be hit if there's a large number of mapIds in a single 
request
           // (Determined by the cache size further up), in which case we go to 
disk again.
           if (info == null) {
-            info = getMapOutputInfo(jobId, mapId, reduceId, user);
+            info = getMapOutputInfo(jobId, dagId, mapId, reduceId, user);
           }
           lastMap =
               sendMapOutput(ctx, ch, user, mapId,
@@ -730,11 +741,11 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     }
 
 
-    protected MapOutputInfo getMapOutputInfo(String jobId, String mapId,
+    protected MapOutputInfo getMapOutputInfo(String jobId, int dagId, String 
mapId,
                                              int reduce, String user) throws 
IOException {
       AttemptPathInfo pathInfo;
       try {
-        AttemptPathIdentifier identifier = new AttemptPathIdentifier(jobId, 
user, mapId);
+        AttemptPathIdentifier identifier = new AttemptPathIdentifier(jobId, 
dagId, user, mapId);
         pathInfo = pathCache.get(identifier);
         LOG.info("DEBUG: Retrieved pathInfo for " + identifier + " check for 
corresponding loaded messages to determine whether it was loaded or cached");
       } catch (ExecutionException e) {
@@ -758,7 +769,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
       return outputInfo;
     }
 
-    protected void populateHeaders(List<String> mapIds, String jobId,
+    protected void populateHeaders(List<String> mapIds, String jobId, int 
dagId,
         String user, int reduce, HttpResponse response,
         boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
         throws IOException {
@@ -767,7 +778,7 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
 
       long contentLength = 0;
       for (String mapId : mapIds) {
-        MapOutputInfo outputInfo = getMapOutputInfo(jobId, mapId, reduce, 
user);
+        MapOutputInfo outputInfo = getMapOutputInfo(jobId, dagId, mapId, 
reduce, user);
         // mapOutputInfoMap is used to share the lookups with the caller
         if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
           mapOutputInfoMap.put(mapId, outputInfo);
@@ -952,8 +963,8 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
   private static final String USERCACHE_CONSTANT = "usercache";
   private static final String APPCACHE_CONSTANT = "appcache";
 
-  private static String getBaseLocation(String jobIdString, String user) {
-    // $x/$user/appcache/$appId/output/$mapId
+  private static String getBaseLocation(String jobIdString, int dagId, String 
user) {
+    // $x/$user/appcache/$appId/${dagId}/output/$mapId
     // TODO: Once Shuffle is out of NM, this can use MR APIs to convert
     // between App and Job
     String parts[] = jobIdString.split("_");
@@ -963,7 +974,9 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     final String baseStr =
         USERCACHE_CONSTANT + "/" + user + "/"
             + APPCACHE_CONSTANT + "/"
-            + ConverterUtils.toString(appID) + "/output" + "/";
+            + ConverterUtils.toString(appID)
+            +  "/" + dagId
+            +  "/output" + "/";
     return baseStr;
   }
 
@@ -980,11 +993,13 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
 
   static class AttemptPathIdentifier {
     private final String jobId;
+    private final int dagId;
     private final String user;
     private final String attemptId;
 
-    public AttemptPathIdentifier(String jobId, String user, String attemptId) {
+    public AttemptPathIdentifier(String jobId, int dagId, String user, String 
attemptId) {
       this.jobId = jobId;
+      this.dagId = dagId;
       this.user = user;
       this.attemptId = attemptId;
     }
@@ -1000,19 +1015,20 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
 
       AttemptPathIdentifier that = (AttemptPathIdentifier) o;
 
-      if (!attemptId.equals(that.attemptId)) {
+      if (dagId != that.dagId) {
         return false;
       }
       if (!jobId.equals(that.jobId)) {
         return false;
       }
+      return attemptId.equals(that.attemptId);
 
-      return true;
     }
 
     @Override
     public int hashCode() {
       int result = jobId.hashCode();
+      result = 31 * result + dagId;
       result = 31 * result + attemptId.hashCode();
       return result;
     }
@@ -1020,11 +1036,11 @@ public class ShuffleHandler implements 
AttemptRegistrationListener {
     @Override
     public String toString() {
       return "AttemptPathIdentifier{" +
-          "attemptId='" + attemptId + '\'' +
-          ", jobId='" + jobId + '\'' +
+          "jobId='" + jobId + '\'' +
+          ", dagId=" + dagId +
+          ", user='" + user + '\'' +
+          ", attemptId='" + attemptId + '\'' +
           '}';
     }
   }
-
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index d35b04a..99459e4 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -16,7 +16,9 @@ package org.apache.hadoop.hive.llap.tezplugins;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.RejectedExecutionException;
@@ -26,12 +28,12 @@ import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -61,10 +63,12 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
 
-  private static final Log LOG = LogFactory.getLog(LlapTaskCommunicator.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapTaskCommunicator.class);
 
   private final SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
   private final ConcurrentMap<String, ByteBuffer> credentialMap;
@@ -73,8 +77,10 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   // When DAG specific cleanup happens, it'll be better to link this to a DAG 
though.
   private final EntityTracker entityTracker = new EntityTracker();
   private final SourceStateTracker sourceStateTracker;
+  private final Set<LlapNodeId> nodesForQuery = new HashSet<>();
 
   private TaskCommunicator communicator;
+  private long deleteDelayOnDagComplete;
   private final LlapTaskUmbilicalProtocol umbilical;
 
   private volatile String currentDagName;
@@ -106,6 +112,11 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     int numThreads = 
conf.getInt(LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS,
         LlapConfiguration.LLAP_DAEMON_COMMUNICATOR_NUM_THREADS_DEFAULT);
     this.communicator = new TaskCommunicator(numThreads);
+    this.deleteDelayOnDagComplete = 
conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS,
+        LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT);
+    LOG.info("Running LlapTaskCommunicator with "
+        + "fileCleanupDelay=" + deleteDelayOnDagComplete
+        + ", numCommunicatorThreads=" + numThreads);
     this.communicator.init(conf);
   }
 
@@ -131,21 +142,23 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
           new JobTokenSecretManager();
       jobTokenSecretManager.addTokenForJob(tokenIdentifier, sessionToken);
 
+      int numHandlers = 
conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
+          TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT);
       server = new RPC.Builder(conf)
           .setProtocol(LlapTaskUmbilicalProtocol.class)
           .setBindAddress("0.0.0.0")
           .setPort(0)
           .setInstance(umbilical)
-          .setNumHandlers(
-              conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
-                  TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
+          .setNumHandlers(numHandlers)
           .setSecretManager(jobTokenSecretManager).build();
 
       // Do serviceACLs need to be refreshed, like in Tez ?
 
       server.start();
       this.address = NetUtils.getConnectAddress(server);
-      LOG.info("Started LlapUmbilical: " + umbilical.getClass().getName() + " 
at address: " + address);
+      LOG.info(
+          "Started LlapUmbilical: " + umbilical.getClass().getName() + " at 
address: " + address +
+              " with numHandlers=" + numHandlers);
     } catch (IOException e) {
       throw new TezUncheckedException(e);
     }
@@ -192,7 +205,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
           ", while trying to launch task: " + taskSpec.getTaskAttemptID());
     }
 
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
     entityTracker.registerTaskAttempt(containerId, 
taskSpec.getTaskAttemptID(), host, port);
+    nodesForQuery.add(nodeId);
 
     sourceStateTracker.registerTaskForStateUpdates(host, port, 
taskSpec.getInputs());
     FragmentRuntimeInfo fragmentRuntimeInfo = 
sourceStateTracker.getFragmentRuntimeInfo(taskSpec.getDAGName(),
@@ -269,6 +284,29 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   }
 
   @Override
+  public void dagComplete(final String dagName) {
+    QueryCompleteRequestProto request = 
QueryCompleteRequestProto.newBuilder().setDagName(
+        dagName).setDeleteDelay(deleteDelayOnDagComplete).build();
+    for (final LlapNodeId llapNodeId : nodesForQuery) {
+      LOG.info("Sending dagComplete message for {}, to {}", dagName, 
llapNodeId);
+      communicator.sendQueryComplete(request, llapNodeId.getHostname(), 
llapNodeId.getPort(),
+          new 
TaskCommunicator.ExecuteRequestCallback<LlapDaemonProtocolProtos.QueryCompleteResponseProto>()
 {
+            @Override
+            public void 
setResponse(LlapDaemonProtocolProtos.QueryCompleteResponseProto response) {
+            }
+
+            @Override
+            public void indicateError(Throwable t) {
+              LOG.warn("Failed to indicate dag complete dagId={} to node {}", 
dagName, llapNodeId);
+            }
+          });
+    }
+
+    nodesForQuery.clear();
+    // TODO Ideally move some of the other cleanup code from resetCurrentDag 
over here
+  }
+
+  @Override
   public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
     // Delegate updates over to the source state tracker.
     sourceStateTracker
@@ -301,9 +339,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     // Working on the assumption that a single DAG runs at a time per AM.
     currentDagName = newDagName;
     sourceStateTracker.resetState(newDagName);
+    nodesForQuery.clear();
     LOG.info("CurrentDag set to: " + newDagName);
-    // TODO Additional state reset. Potentially sending messages to node to 
reset.
-    // Is it possible for heartbeats to come in from lost tasks - those should 
be told to die, which
+    // TODO Is it possible for heartbeats to come in from lost tasks - those 
should be told to die, which
     // is likely already happening.
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
index 3b4612d..d536eb2 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/TaskCommunicator.java
@@ -31,6 +31,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.llap.daemon.LlapDaemonProtocolBlockingPB;
 import org.apache.hadoop.hive.llap.daemon.impl.LlapDaemonProtocolClientImpl;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
@@ -74,7 +76,8 @@ public class TaskCommunicator extends AbstractService {
 
   }
 
-  public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto 
request, final String host, final int port,
+  public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto 
request, final String host,
+                                    final int port,
                                     final 
ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
     ListenableFuture<SourceStateUpdatedResponseProto> future =
         executor.submit(new SendSourceStateUpdateCallable(host, port, 
request));
@@ -91,7 +94,26 @@ public class TaskCommunicator extends AbstractService {
     });
   }
 
-  private static abstract class CallableRequest<REQUEST extends  Message, 
RESPONSE extends Message> implements Callable {
+  public void sendQueryComplete(final QueryCompleteRequestProto request, final 
String host,
+                                final int port,
+                                final 
ExecuteRequestCallback<QueryCompleteResponseProto> callback) {
+    ListenableFuture<QueryCompleteResponseProto> future =
+        executor.submit(new SendQueryCompleteCallable(host, port, request));
+    Futures.addCallback(future, new 
FutureCallback<QueryCompleteResponseProto>() {
+      @Override
+      public void onSuccess(QueryCompleteResponseProto result) {
+        callback.setResponse(result);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        callback.indicateError(t);
+      }
+    });
+  }
+
+  private static abstract class CallableRequest<REQUEST extends Message, 
RESPONSE extends Message>
+      implements Callable {
 
     final String hostname;
     final int port;
@@ -134,6 +156,20 @@ public class TaskCommunicator extends AbstractService {
     }
   }
 
+  private class SendQueryCompleteCallable
+      extends CallableRequest<QueryCompleteRequestProto, 
QueryCompleteResponseProto> {
+
+    protected SendQueryCompleteCallable(String hostname, int port,
+                                        QueryCompleteRequestProto 
queryCompleteRequestProto) {
+      super(hostname, port, queryCompleteRequestProto);
+    }
+
+    @Override
+    public QueryCompleteResponseProto call() throws Exception {
+      return getProxy(hostname, port).queryComplete(null, request);
+    }
+  }
+
   public interface ExecuteRequestCallback<T extends Message> {
     void setResponse(T response);
     void indicateError(Throwable t);

http://git-wip-us.apache.org/repos/asf/hive/blob/cae3ec16/llap-server/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-server/src/protobuf/LlapDaemonProtocol.proto 
b/llap-server/src/protobuf/LlapDaemonProtocol.proto
index f7e6d2b..e098e87 100644
--- a/llap-server/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-server/src/protobuf/LlapDaemonProtocol.proto
@@ -98,7 +98,30 @@ message SourceStateUpdatedRequestProto {
 message SourceStateUpdatedResponseProto {
 }
 
+message QueryCompleteRequestProto {
+  optional string query_id = 1;
+  optional string dag_name = 2;
+  optional int64 delete_delay = 3 [default = 0];
+}
+
+message QueryCompleteResponseProto {
+}
+
+message TerminateFragmentRequestProto {
+  optional string query_id = 1;
+  optional string dag_name = 2;
+  optional int32 dag_attempt_number = 3;
+  optional string vertex_name = 4;
+  optional int32 fragment_number = 5;
+  optional int32 attempt_number = 6;
+}
+
+message TerminateFragmentResponseProto {
+}
+
 service LlapDaemonProtocol {
   rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
   rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns 
(SourceStateUpdatedResponseProto);
+  rpc queryComplete(QueryCompleteRequestProto) returns 
(QueryCompleteResponseProto);
+  rpc terminateFragment(TerminateFragmentRequestProto) returns 
(TerminateFragmentResponseProto);
 }

Reply via email to