HIVE-12934. Refactor llap module structure to allow for a usable client. 
(Siddharth Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4185d9b8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4185d9b8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4185d9b8

Branch: refs/heads/master
Commit: 4185d9b8e2eecfef3b5a38899f6928fa82c01e99
Parents: 11f1e47
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Feb 1 09:45:09 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Feb 1 09:45:09 2016 -0800

----------------------------------------------------------------------
 llap-client/pom.xml                             |    87 +-
 .../hive/llap/security/LlapTokenIdentifier.java |    82 -
 .../hive/llap/security/LlapTokenProvider.java   |    27 -
 .../hive/llap/tez/LlapProtocolClientProxy.java  |   509 +
 .../tez/TestLlapDaemonProtocolClientProxy.java  |   144 +
 llap-common/pom.xml                             |   235 +
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 14553 +++++++++++++++++
 .../org/apache/hadoop/hive/llap/LlapNodeId.java |    86 +
 .../impl/LlapManagementProtocolClientImpl.java  |    82 +
 .../hive/llap/impl/LlapProtocolClientImpl.java  |   125 +
 .../llap/protocol/LlapManagementProtocolPB.java |    26 +
 .../llap/protocol/LlapProtocolBlockingPB.java   |    30 +
 .../protocol/LlapTaskUmbilicalProtocol.java     |    42 +
 .../hive/llap/security/LlapTokenIdentifier.java |    82 +
 .../hive/llap/security/LlapTokenProvider.java   |    27 +
 .../hive/llap/security/LlapTokenSelector.java   |    53 +
 .../apache/hadoop/hive/llap/tez/Converters.java |   265 +
 .../src/protobuf/LlapDaemonProtocol.proto       |   148 +
 .../hive/llap/testhelpers/ControlledClock.java  |    43 +
 llap-server/pom.xml                             |   199 +-
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 14553 -----------------
 .../org/apache/hadoop/hive/llap/LlapNodeId.java |    86 -
 .../hadoop/hive/llap/cli/LlapServiceDriver.java |     8 +
 .../daemon/LlapDaemonProtocolBlockingPB.java    |    28 -
 .../LlapManagementProtocolBlockingPB.java       |    24 -
 .../hive/llap/daemon/impl/LlapDaemon.java       |     6 +-
 .../impl/LlapDaemonProtocolClientImpl.java      |   125 -
 .../impl/LlapDaemonProtocolServerImpl.java      |   251 -
 .../impl/LlapManagementProtocolClientImpl.java  |    82 -
 .../daemon/impl/LlapProtocolServerImpl.java     |   251 +
 .../llap/daemon/impl/QueryFragmentInfo.java     |    13 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |     2 +-
 .../protocol/LlapTaskUmbilicalProtocol.java     |    42 -
 .../llap/security/LlapDaemonPolicyProvider.java |     8 +-
 .../hive/llap/security/LlapSecurityHelper.java  |     2 +-
 .../llap/security/LlapServerSecurityInfo.java   |    10 +-
 .../hive/llap/security/LlapTokenSelector.java   |    53 -
 .../hadoop/hive/llap/tezplugins/Converters.java |   265 -
 .../llap/tezplugins/LlapContainerLauncher.java  |    43 -
 .../LlapDaemonProtocolClientProxy.java          |   509 -
 .../llap/tezplugins/LlapTaskCommunicator.java   |   755 -
 .../tezplugins/LlapUmbilicalPolicyProvider.java |    42 -
 .../tezplugins/helpers/SourceStateTracker.java  |   296 -
 .../apache/tez/dag/app/rm/ContainerFactory.java |    51 -
 .../dag/app/rm/LlapTaskSchedulerService.java    |  1512 --
 .../src/protobuf/LlapDaemonProtocol.proto       |   148 -
 .../impl/TestLlapDaemonProtocolServerImpl.java  |    11 +-
 .../TestLlapDaemonProtocolClientProxy.java      |   143 -
 .../tezplugins/TestLlapTaskCommunicator.java    |   100 -
 .../app/rm/TestLlapTaskSchedulerService.java    |   685 -
 llap-tez/pom.xml                                |   200 +
 .../hive/llap/tezplugins/ContainerFactory.java  |    51 +
 .../llap/tezplugins/LlapContainerLauncher.java  |    43 +
 .../llap/tezplugins/LlapTaskCommunicator.java   |   757 +
 .../tezplugins/LlapTaskSchedulerService.java    |  1512 ++
 .../hive/llap/tezplugins/LlapTezUtils.java      |    29 +
 .../tezplugins/LlapUmbilicalPolicyProvider.java |    42 +
 .../tezplugins/helpers/SourceStateTracker.java  |   289 +
 .../tezplugins/TestLlapTaskCommunicator.java    |   100 +
 .../TestLlapTaskSchedulerService.java           |   684 +
 pom.xml                                         |     4 +-
 ql/pom.xml                                      |    54 +
 .../hive/ql/exec/tez/TezSessionState.java       |    58 +-
 63 files changed, 20631 insertions(+), 20141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-client/pom.xml
----------------------------------------------------------------------
diff --git a/llap-client/pom.xml b/llap-client/pom.xml
index f6a5629..50c06a4 100644
--- a/llap-client/pom.xml
+++ b/llap-client/pom.xml
@@ -32,7 +32,7 @@
   </properties>
 
   <dependencies>
-    <!-- dependencies are always listed in sorted order by groupId, artifectId 
-->
+    <!-- dependencies are always listed in sorted order by groupId, artifactId 
-->
     <!-- intra-project -->
     <dependency>
       <groupId>org.apache.hive</groupId>
@@ -41,41 +41,26 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hive</groupId>
-      <artifactId>hive-serde</artifactId>
+      <artifactId>hive-llap-common</artifactId>
       <version>${project.version}</version>
     </dependency>
     <!-- inter-project -->
     <dependency>
-      <groupId>commons-codec</groupId>
-      <artifactId>commons-codec</artifactId>
-      <version>${commons-codec.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>commons-lang</groupId>
-      <artifactId>commons-lang</artifactId>
-      <version>${commons-lang.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.thrift</groupId>
-      <artifactId>libthrift</artifactId>
-      <version>${libthrift.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <version>${hadoop.version}</version>
       <optional>true</optional>
-          <exclusions>
-            <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>commmons-logging</groupId>
-            <artifactId>commons-logging</artifactId>
-          </exclusion>
-        </exclusions>
-   </dependency>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
@@ -96,15 +81,15 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.mockito</groupId>
-      <artifactId>mockito-all</artifactId>
-      <version>${mockito-all.version}</version>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.sun.jersey</groupId>
-      <artifactId>jersey-servlet</artifactId>
-      <version>${jersey.version}</version>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-all.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
@@ -113,32 +98,18 @@
       <version>${hadoop.version}</version>
       <classifier>tests</classifier>
       <scope>test</scope>
-          <exclusions>
-            <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>commmons-logging</groupId>
-            <artifactId>commons-logging</artifactId>
-          </exclusion>
-        </exclusions>
-   </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <version>${hadoop.version}</version>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-hdfs</artifactId>
-      <version>${hadoop.version}</version>
-      <classifier>tests</classifier>
-      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
   </dependencies>
-
   <build>
     <sourceDirectory>${basedir}/src/java</sourceDirectory>
     <testSourceDirectory>${basedir}/src/test</testSourceDirectory>

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
deleted file mode 100644
index f0bb495..0000000
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenIdentifier.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.token.Token;
-import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
-
-/** For now, a LLAP token gives access to any LLAP server. */
-public class LlapTokenIdentifier extends AbstractDelegationTokenIdentifier {
-  private static final String KIND = "LLAP_TOKEN";
-  public static final Text KIND_NAME = new Text(KIND);
-
-  public LlapTokenIdentifier() {
-    super();
-  }
-
-  public LlapTokenIdentifier(Text owner, Text renewer, Text realUser) {
-    super(owner, renewer, realUser);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    // Nothing right now.
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    // Nothing right now.
-  }
-
-  @Override
-  public Text getKind() {
-    return KIND_NAME;
-  }
-
-  @Override
-  public int hashCode() {
-    return -1;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    return (other != null) && 
other.getClass().isAssignableFrom(this.getClass());
-  }
-
-  @Override
-  public String toString() {
-    return KIND;
-  }
-
-  @InterfaceAudience.Private
-  public static class Renewer extends Token.TrivialRenewer {
-    @Override
-    protected Text getKind() {
-      return KIND_NAME;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
deleted file mode 100644
index 2e99a28..0000000
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/security/LlapTokenProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.security;
-
-import java.io.IOException;
-
-import org.apache.hadoop.security.token.Token;
-
-public interface LlapTokenProvider {
-  Token<LlapTokenIdentifier> getDelegationToken() throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
new file mode 100644
index 0000000..5b0674a
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/tez/LlapProtocolClientProxy.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tez;
+
+import javax.net.SocketFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Message;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.protocol.LlapProtocolBlockingPB;
+import org.apache.hadoop.hive.llap.impl.LlapProtocolClientImpl;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkResponseProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto;
+import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.service.AbstractService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LlapProtocolClientProxy extends AbstractService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LlapProtocolClientProxy.class);
+
+  private final ConcurrentMap<String, LlapProtocolBlockingPB> hostProxies;
+
+  private final RequestManager requestManager;
+  private final RetryPolicy retryPolicy;
+  private final SocketFactory socketFactory;
+
+  private final ListeningExecutorService requestManagerExecutor;
+  private volatile ListenableFuture<Void> requestManagerFuture;
+  private final Token<LlapTokenIdentifier> llapToken;
+
+  public LlapProtocolClientProxy(
+      int numThreads, Configuration conf, Token<LlapTokenIdentifier> 
llapToken) {
+    super(LlapProtocolClientProxy.class.getSimpleName());
+    this.hostProxies = new ConcurrentHashMap<>();
+    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+    this.llapToken = llapToken;
+
+    long connectionTimeout = HiveConf.getTimeVar(conf,
+        ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+    long retrySleep = HiveConf.getTimeVar(conf,
+        ConfVars.LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MS,
+        TimeUnit.MILLISECONDS);
+    this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(
+        connectionTimeout, retrySleep, TimeUnit.MILLISECONDS);
+
+    this.requestManager = new RequestManager(numThreads);
+    ExecutorService localExecutor = Executors.newFixedThreadPool(1,
+        new 
ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build());
+    this.requestManagerExecutor = 
MoreExecutors.listeningDecorator(localExecutor);
+
+    LOG.info("Setting up taskCommunicator with" +
+        "numThreads=" + numThreads +
+        "retryTime(millis)=" + connectionTimeout +
+        "retrySleep(millis)=" + retrySleep);
+  }
+
+  @Override
+  public void serviceStart() {
+    requestManagerFuture = requestManagerExecutor.submit(requestManager);
+    Futures.addCallback(requestManagerFuture, new FutureCallback<Void>() {
+      @Override
+      public void onSuccess(Void result) {
+        LOG.info("RequestManager shutdown");
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.warn("RequestManager shutdown with error", t);
+      }
+    });
+  }
+
+  @Override
+  public void serviceStop() {
+    if (requestManagerFuture != null) {
+      requestManager.shutdown();
+      requestManagerFuture.cancel(true);
+    }
+    requestManagerExecutor.shutdown();
+  }
+
+  public void sendSubmitWork(SubmitWorkRequestProto request, String host, int 
port,
+                         final ExecuteRequestCallback<SubmitWorkResponseProto> 
callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    requestManager.queueRequest(new SubmitWorkCallable(nodeId, request, 
callback));
+  }
+
+  public void sendSourceStateUpdate(final SourceStateUpdatedRequestProto 
request, final String host,
+                                    final int port,
+                                    final 
ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    requestManager.queueRequest(
+        new SendSourceStateUpdateCallable(nodeId, request, callback));
+  }
+
+  public void sendQueryComplete(final QueryCompleteRequestProto request, final 
String host,
+                                final int port,
+                                final 
ExecuteRequestCallback<QueryCompleteResponseProto> callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    requestManager.queueRequest(new SendQueryCompleteCallable(nodeId, request, 
callback));
+  }
+
+  public void sendTerminateFragment(final TerminateFragmentRequestProto 
request, final String host,
+                                    final int port,
+                                    final 
ExecuteRequestCallback<TerminateFragmentResponseProto> callback) {
+    LlapNodeId nodeId = LlapNodeId.getInstance(host, port);
+    requestManager.queueRequest(new SendTerminateFragmentCallable(nodeId, 
request, callback));
+  }
+
+  @VisibleForTesting
+  static class RequestManager implements Callable<Void> {
+
+    private final Lock lock = new ReentrantLock();
+    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+    private final Condition queueCondition = lock.newCondition();
+    private final AtomicBoolean shouldRun = new AtomicBoolean(false);
+
+    private final int maxConcurrentRequestsPerNode = 1;
+    private final ListeningExecutorService executor;
+
+
+    // Tracks new additions via add, while the loop is processing existing 
ones.
+    private final LinkedList<CallableRequest> newRequestList = new 
LinkedList<>();
+
+    // Tracks existing requests which are cycled through.
+    private final LinkedList<CallableRequest> pendingRequests = new 
LinkedList<>();
+
+    // Tracks requests executing per node
+    private final ConcurrentMap<LlapNodeId, AtomicInteger> runningRequests = 
new ConcurrentHashMap<>();
+
+    // Tracks completed requests pre node
+    private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>();
+
+    public RequestManager(int numThreads) {
+      ExecutorService localExecutor = Executors.newFixedThreadPool(numThreads,
+          new ThreadFactoryBuilder().setNameFormat("TaskCommunicator 
#%2d").build());
+      executor = MoreExecutors.listeningDecorator(localExecutor);
+    }
+
+
+    @VisibleForTesting
+    Set<LlapNodeId> currentLoopDisabledNodes = new HashSet<>();
+    @VisibleForTesting
+    List<CallableRequest> currentLoopSkippedRequests = new LinkedList<>();
+    @Override
+    public Void call() {
+      // Caches disabled nodes for quicker lookups and ensures a request on a 
node which was skipped
+      // does not go out of order.
+      while (!isShutdown.get()) {
+        lock.lock();
+        try {
+          while (!shouldRun.get()) {
+            queueCondition.await();
+            break; // Break out and try executing.
+          }
+          boolean shouldBreak = process();
+          if (shouldBreak) {
+            break;
+          }
+        } catch (InterruptedException e) {
+          if (isShutdown.get()) {
+            break;
+          } else {
+            LOG.warn("RunLoop interrupted without being shutdown first");
+            throw new RuntimeException(e);
+          }
+        } finally {
+          lock.unlock();
+        }
+      }
+      LOG.info("CallScheduler loop exiting");
+      return null;
+    }
+
+    /* Add a new request to be executed */
+    public void queueRequest(CallableRequest request) {
+      synchronized (newRequestList) {
+        newRequestList.add(request);
+        shouldRun.set(true);
+      }
+      notifyRunLoop();
+    }
+
+    /* Indicates a request has completed on a node */
+    public void requestFinished(LlapNodeId nodeId) {
+      synchronized (completedNodes) {
+        completedNodes.add(nodeId);
+        shouldRun.set(true);
+      }
+      notifyRunLoop();
+    }
+
+    public void shutdown() {
+      if (!isShutdown.getAndSet(true)) {
+        executor.shutdownNow();
+        notifyRunLoop();
+      }
+    }
+
+    @VisibleForTesting
+    void submitToExecutor(CallableRequest request, LlapNodeId nodeId) {
+      ListenableFuture<SourceStateUpdatedResponseProto> future =
+          executor.submit(request);
+      Futures.addCallback(future, new ResponseCallback(request.getCallback(), 
nodeId, this));
+    }
+
+    @VisibleForTesting
+    boolean process() {
+      if (isShutdown.get()) {
+        return true;
+      }
+      currentLoopDisabledNodes.clear();
+      currentLoopSkippedRequests.clear();
+
+      // Set to false to block the next loop. This must be called before 
draining the lists,
+      // otherwise an add/completion after draining the lists but before 
setting it to false,
+      // will not trigger a run. May cause one unnecessary run if an add comes 
in before drain.
+      // drain list. add request (setTrue). setFalse needs to be avoided.
+      shouldRun.compareAndSet(true, false);
+      // Drain any calls which may have come in during the last execution of 
the loop.
+      drainNewRequestList();  // Locks newRequestList
+      drainCompletedNodes();  // Locks completedNodes
+
+
+      Iterator<CallableRequest> iterator = pendingRequests.iterator();
+      while (iterator.hasNext()) {
+        CallableRequest request = iterator.next();
+        iterator.remove();
+        LlapNodeId nodeId = request.getNodeId();
+        if (canRunForNode(nodeId, currentLoopDisabledNodes)) {
+          submitToExecutor(request, nodeId);
+        } else {
+          currentLoopDisabledNodes.add(nodeId);
+          currentLoopSkippedRequests.add(request);
+        }
+      }
+      // Tried scheduling everything that could be scheduled in this loop.
+      pendingRequests.addAll(0, currentLoopSkippedRequests);
+      return false;
+    }
+
+    private void drainNewRequestList() {
+      synchronized (newRequestList) {
+        if (!newRequestList.isEmpty()) {
+          pendingRequests.addAll(newRequestList);
+          newRequestList.clear();
+        }
+      }
+    }
+
+    private void drainCompletedNodes() {
+      synchronized (completedNodes) {
+        if (!completedNodes.isEmpty()) {
+          for (LlapNodeId nodeId : completedNodes) {
+            runningRequests.get(nodeId).decrementAndGet();
+          }
+        }
+        completedNodes.clear();
+      }
+    }
+
+    private boolean canRunForNode(LlapNodeId nodeId, Set<LlapNodeId> 
currentRunDisabledNodes) {
+      if (currentRunDisabledNodes.contains(nodeId)) {
+        return false;
+      } else {
+        AtomicInteger count = runningRequests.get(nodeId);
+        if (count == null) {
+          count = new AtomicInteger(0);
+          AtomicInteger old = runningRequests.putIfAbsent(nodeId, count);
+          count = old != null ? old : count;
+        }
+        if (count.incrementAndGet() <= maxConcurrentRequestsPerNode) {
+          return true;
+        } else {
+          count.decrementAndGet();
+          return false;
+        }
+      }
+    }
+
+    private void notifyRunLoop() {
+      lock.lock();
+      try {
+        queueCondition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
+
+
+  private static final class ResponseCallback<TYPE extends Message>
+      implements FutureCallback<TYPE> {
+
+    private final ExecuteRequestCallback<TYPE> callback;
+    private final LlapNodeId nodeId;
+    private final RequestManager requestManager;
+
+    public ResponseCallback(ExecuteRequestCallback<TYPE> callback, LlapNodeId 
nodeId,
+                            RequestManager requestManager) {
+      this.callback = callback;
+      this.nodeId = nodeId;
+      this.requestManager = requestManager;
+    }
+
+    @Override
+    public void onSuccess(TYPE result) {
+      try {
+        callback.setResponse(result);
+      } finally {
+        requestManager.requestFinished(nodeId);
+      }
+    }
+
+    @Override
+    public void onFailure(Throwable t) {
+      try {
+        callback.indicateError(t);
+      } finally {
+        requestManager.requestFinished(nodeId);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static abstract class CallableRequest<REQUEST extends Message, RESPONSE 
extends Message>
+      implements Callable {
+
+    final LlapNodeId nodeId;
+    final ExecuteRequestCallback<RESPONSE> callback;
+    final REQUEST request;
+
+
+    protected CallableRequest(LlapNodeId nodeId, REQUEST request, 
ExecuteRequestCallback<RESPONSE> callback) {
+      this.nodeId = nodeId;
+      this.request = request;
+      this.callback = callback;
+    }
+
+    public LlapNodeId getNodeId() {
+      return nodeId;
+    }
+
+    public ExecuteRequestCallback<RESPONSE> getCallback() {
+      return callback;
+    }
+
+    public abstract RESPONSE call() throws Exception;
+  }
+
+  private class SubmitWorkCallable extends 
CallableRequest<SubmitWorkRequestProto, SubmitWorkResponseProto> {
+
+    protected SubmitWorkCallable(LlapNodeId nodeId,
+                          SubmitWorkRequestProto submitWorkRequestProto,
+                                 
ExecuteRequestCallback<SubmitWorkResponseProto> callback) {
+      super(nodeId, submitWorkRequestProto, callback);
+    }
+
+    @Override
+    public SubmitWorkResponseProto call() throws Exception {
+      return getProxy(nodeId).submitWork(null, request);
+    }
+  }
+
+  private class SendSourceStateUpdateCallable
+      extends CallableRequest<SourceStateUpdatedRequestProto, 
SourceStateUpdatedResponseProto> {
+
+    public SendSourceStateUpdateCallable(LlapNodeId nodeId,
+                                         SourceStateUpdatedRequestProto 
request,
+                                         
ExecuteRequestCallback<SourceStateUpdatedResponseProto> callback) {
+      super(nodeId, request, callback);
+    }
+
+    @Override
+    public SourceStateUpdatedResponseProto call() throws Exception {
+      return getProxy(nodeId).sourceStateUpdated(null, request);
+    }
+  }
+
+  private class SendQueryCompleteCallable
+      extends CallableRequest<QueryCompleteRequestProto, 
QueryCompleteResponseProto> {
+
+    protected SendQueryCompleteCallable(LlapNodeId nodeId,
+                                        QueryCompleteRequestProto 
queryCompleteRequestProto,
+                                        
ExecuteRequestCallback<QueryCompleteResponseProto> callback) {
+      super(nodeId, queryCompleteRequestProto, callback);
+    }
+
+    @Override
+    public QueryCompleteResponseProto call() throws Exception {
+      return getProxy(nodeId).queryComplete(null, request);
+    }
+  }
+
+  private class SendTerminateFragmentCallable
+      extends CallableRequest<TerminateFragmentRequestProto, 
TerminateFragmentResponseProto> {
+
+    protected SendTerminateFragmentCallable(LlapNodeId nodeId,
+                                            TerminateFragmentRequestProto 
terminateFragmentRequestProto,
+                                            
ExecuteRequestCallback<TerminateFragmentResponseProto> callback) {
+      super(nodeId, terminateFragmentRequestProto, callback);
+    }
+
+    @Override
+    public TerminateFragmentResponseProto call() throws Exception {
+      return getProxy(nodeId).terminateFragment(null, request);
+    }
+  }
+
+  public interface ExecuteRequestCallback<T extends Message> {
+    void setResponse(T response);
+    void indicateError(Throwable t);
+  }
+
+  private LlapProtocolBlockingPB getProxy(final LlapNodeId nodeId) {
+    String hostId = getHostIdentifier(nodeId.getHostname(), nodeId.getPort());
+
+    LlapProtocolBlockingPB proxy = hostProxies.get(hostId);
+    if (proxy == null) {
+      if (llapToken == null) {
+        proxy = new LlapProtocolClientImpl(getConfig(), nodeId.getHostname(),
+            nodeId.getPort(), retryPolicy, socketFactory);
+      } else {
+        UserGroupInformation ugi;
+        try {
+          ugi = UserGroupInformation.getCurrentUser();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        Token<LlapTokenIdentifier> nodeToken = new 
Token<LlapTokenIdentifier>(llapToken);
+        SecurityUtil.setTokenService(nodeToken, 
NetUtils.createSocketAddrForHost(
+            nodeId.getHostname(), nodeId.getPort()));
+        ugi.addToken(nodeToken);
+        proxy = ugi.doAs(new PrivilegedAction<LlapProtocolBlockingPB>() {
+          @Override
+          public LlapProtocolBlockingPB run() {
+           return new LlapProtocolClientImpl(getConfig(), nodeId.getHostname(),
+               nodeId.getPort(), retryPolicy, socketFactory);
+          }
+        });
+      }
+
+      LlapProtocolBlockingPB proxyOld = hostProxies.putIfAbsent(hostId, proxy);
+      if (proxyOld != null) {
+        // TODO Shutdown the new proxy.
+        proxy = proxyOld;
+      }
+    }
+    return proxy;
+  }
+
+  private String getHostIdentifier(String hostname, int port) {
+    return hostname + ":" + port;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java
 
b/llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java
new file mode 100644
index 0000000..850b67a
--- /dev/null
+++ 
b/llap-client/src/test/org/apache/hadoop/hive/llap/tez/TestLlapDaemonProtocolClientProxy.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.tez;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.protobuf.Message;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestLlapDaemonProtocolClientProxy {
+
+  @Test (timeout = 5000)
+  public void testMultipleNodes() {
+    RequestManagerForTest requestManager = new RequestManagerForTest(1);
+
+    LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
+    LlapNodeId nodeId2 = LlapNodeId.getInstance("host2", 1025);
+
+    Message mockMessage = mock(Message.class);
+    LlapProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback 
= mock(
+        LlapProtocolClientProxy.ExecuteRequestCallback.class);
+
+    // Request two messages
+    requestManager.queueRequest(
+        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
+    requestManager.queueRequest(
+        new CallableRequestForTest(nodeId2, mockMessage, 
mockExecuteRequestCallback));
+
+    // Should go through in a single process call
+    requestManager.process();
+    assertEquals(2, requestManager.numSubmissionsCounters);
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId2));
+    Assert.assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+    Assert.assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId2).getValue().intValue());
+    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+    assertEquals(0, requestManager.currentLoopDisabledNodes.size());
+  }
+
+  @Test(timeout = 5000)
+  public void testSingleInvocationPerNode() {
+    RequestManagerForTest requestManager = new RequestManagerForTest(1);
+
+    LlapNodeId nodeId1 = LlapNodeId.getInstance("host1", 1025);
+
+    Message mockMessage = mock(Message.class);
+    LlapProtocolClientProxy.ExecuteRequestCallback mockExecuteRequestCallback 
= mock(
+        LlapProtocolClientProxy.ExecuteRequestCallback.class);
+
+    // First request for host.
+    requestManager.queueRequest(
+        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
+    requestManager.process();
+    assertEquals(1, requestManager.numSubmissionsCounters);
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+    Assert.assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+
+    // Second request for host. Single invocation since the last has not 
completed.
+    requestManager.queueRequest(
+        new CallableRequestForTest(nodeId1, mockMessage, 
mockExecuteRequestCallback));
+    requestManager.process();
+    assertEquals(1, requestManager.numSubmissionsCounters);
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+    Assert.assertEquals(1, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+    assertEquals(1, requestManager.currentLoopSkippedRequests.size());
+    assertEquals(1, requestManager.currentLoopDisabledNodes.size());
+    assertTrue(requestManager.currentLoopDisabledNodes.contains(nodeId1));
+
+    // Complete first request. Second pending request should go through.
+    requestManager.requestFinished(nodeId1);
+    requestManager.process();
+    assertEquals(2, requestManager.numSubmissionsCounters);
+    assertNotNull(requestManager.numInvocationsPerNode.get(nodeId1));
+    Assert.assertEquals(2, 
requestManager.numInvocationsPerNode.get(nodeId1).getValue().intValue());
+    assertEquals(0, requestManager.currentLoopSkippedRequests.size());
+    assertEquals(0, requestManager.currentLoopDisabledNodes.size());
+    assertFalse(requestManager.currentLoopDisabledNodes.contains(nodeId1));
+  }
+
+
+  static class RequestManagerForTest extends 
LlapProtocolClientProxy.RequestManager {
+
+    int numSubmissionsCounters = 0;
+    private Map<LlapNodeId, MutableInt> numInvocationsPerNode = new 
HashMap<>();
+
+    public RequestManagerForTest(int numThreads) {
+      super(numThreads);
+    }
+
+    protected void submitToExecutor(LlapProtocolClientProxy.CallableRequest 
request, LlapNodeId nodeId) {
+      numSubmissionsCounters++;
+      MutableInt nodeCount = numInvocationsPerNode.get(nodeId);
+      if (nodeCount == null) {
+        nodeCount = new MutableInt(0);
+        numInvocationsPerNode.put(nodeId, nodeCount);
+      }
+      nodeCount.increment();
+    }
+
+    void reset() {
+      numSubmissionsCounters = 0;
+      numInvocationsPerNode.clear();
+    }
+
+  }
+
+  static class CallableRequestForTest extends 
LlapProtocolClientProxy.CallableRequest<Message, Message> {
+
+    protected CallableRequestForTest(LlapNodeId nodeId, Message message,
+                                     
LlapProtocolClientProxy.ExecuteRequestCallback<Message> callback) {
+      super(nodeId, message, callback);
+    }
+
+    @Override
+    public Message call() throws Exception {
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/4185d9b8/llap-common/pom.xml
----------------------------------------------------------------------
diff --git a/llap-common/pom.xml b/llap-common/pom.xml
new file mode 100644
index 0000000..5343479
--- /dev/null
+++ b/llap-common/pom.xml
@@ -0,0 +1,235 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hive</groupId>
+    <artifactId>hive</artifactId>
+    <version>2.1.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>hive-llap-common</artifactId>
+  <packaging>jar</packaging>
+  <name>Hive Llap Common</name>
+
+  <properties>
+    <hive.path.to.root>..</hive.path.to.root>
+  </properties>
+
+  <dependencies>
+    <!-- dependencies are always listed in sorted order by groupId, artifactId 
-->
+    <!-- intra-project -->
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <!-- inter-project -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+            <exclusions>
+            <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>commmons-logging</groupId>
+            <artifactId>commons-logging</artifactId>
+          </exclusion>
+        </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-runtime-internals</artifactId>
+      <version>${tez.version}</version>
+      <optional>true</optional>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <!-- test inter-project -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commmons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito-all.version}</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>protobuf</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-antrun-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>generate-protobuf-sources</id>
+                <phase>generate-sources</phase>
+                <configuration>
+                  <target>
+                    <property name="protobuf.src.dir"  
location="${basedir}/src/protobuf"/>
+                    <property name="protobuf.build.dir"  
location="${basedir}/src/gen/protobuf/gen-java"/>
+                    <echo>Building LLAP Protobuf</echo>
+                    <mkdir dir="${protobuf.build.dir}"/>
+                    <exec executable="protoc" failonerror="true">
+                      <arg value="--java_out=${protobuf.build.dir}"/>
+                      <arg value="-I=${protobuf.src.dir}"/>
+                      <arg 
value="${protobuf.src.dir}/LlapDaemonProtocol.proto"/>
+                    </exec>
+                  </target>
+                </configuration>
+                <goals>
+                  <goal>run</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+            <executions>
+              <execution>
+                <goals>
+                  <goal>test-jar</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+
+  <build>
+    <sourceDirectory>${basedir}/src/java</sourceDirectory>
+    <testSourceDirectory>${basedir}/src/test</testSourceDirectory>
+    <resources>
+    <resource>
+      <directory>src/main/resources</directory>
+      <excludes>
+        <exclude>*.py</exclude>
+        <exclude>*.pyc</exclude>
+      </excludes>
+      <filtering>false</filtering>
+    </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>src/gen/protobuf/gen-java</source>
+                <source>src/gen/thrift/gen-javabean</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

Reply via email to