Repository: hive
Updated Branches:
  refs/heads/master 59e6c83fd -> 0d69a88b9


HIVE-13827 : LLAPIF: authentication on the output channel (Sergey Shelukhin, 
reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0d69a88b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0d69a88b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0d69a88b

Branch: refs/heads/master
Commit: 0d69a88b9f317e93194e08aee409f0b1b6ccab7c
Parents: 59e6c83
Author: Sergey Shelukhin <ser...@apache.org>
Authored: Mon Jun 13 18:57:35 2016 -0700
Committer: Sergey Shelukhin <ser...@apache.org>
Committed: Mon Jun 13 18:57:35 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../daemon/rpc/LlapDaemonProtocolProtos.java    | 665 ++++++++++++++++++-
 .../hive/llap/security/SecretManager.java       |  11 +
 .../apache/hadoop/hive/llap/tez/Converters.java |  13 +
 .../src/protobuf/LlapDaemonProtocol.proto       |   6 +
 .../hadoop/hive/llap/LlapBaseInputFormat.java   |  32 +-
 .../hive/llap/daemon/impl/LlapDaemon.java       |  11 +-
 .../daemon/impl/LlapProtocolServerImpl.java     |  19 +-
 .../impl/TestLlapDaemonProtocolServerImpl.java  |   2 +-
 .../hadoop/hive/llap/LlapOutputFormat.java      |   3 -
 .../hive/llap/LlapOutputFormatService.java      | 141 ++--
 .../hive/ql/exec/tez/MapRecordProcessor.java    |  18 +-
 .../apache/hadoop/hive/ql/plan/PlanUtils.java   |  11 +-
 .../ql/udf/generic/GenericUDTFGetSplits.java    | 204 +++---
 .../hadoop/hive/llap/TestLlapOutputFormat.java  |  60 +-
 15 files changed, 971 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 285caa3..761dbb2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2883,6 +2883,10 @@ public class HiveConf extends Configuration {
         "protocol or ZK paths), similar to how ssh refuses a key with bad 
access permissions."),
     LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 
15003,
         "LLAP daemon output service port"),
+    
LLAP_DAEMON_OUTPUT_STREAM_TIMEOUT("hive.llap.daemon.output.stream.timeout", 
"120s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "The timeout for the client to connect to LLAP output service and 
start the fragment\n" +
+        "output after sending the fragment. The fragment will fail if its 
output is not claimed."),
     
LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size",
         128 * 1024, "Send buffer size to be used by LLAP daemon output 
service"),
     LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", 
false,

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
 
b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
index 856ea30..56a1361 100644
--- 
a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
+++ 
b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java
@@ -16441,6 +16441,624 @@ public final class LlapDaemonProtocolProtos {
     // @@protoc_insertion_point(class_scope:GetTokenResponseProto)
   }
 
+  public interface LlapOutputSocketInitMessageOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // required string fragment_id = 1;
+    /**
+     * <code>required string fragment_id = 1;</code>
+     */
+    boolean hasFragmentId();
+    /**
+     * <code>required string fragment_id = 1;</code>
+     */
+    java.lang.String getFragmentId();
+    /**
+     * <code>required string fragment_id = 1;</code>
+     */
+    com.google.protobuf.ByteString
+        getFragmentIdBytes();
+
+    // optional bytes token = 2;
+    /**
+     * <code>optional bytes token = 2;</code>
+     */
+    boolean hasToken();
+    /**
+     * <code>optional bytes token = 2;</code>
+     */
+    com.google.protobuf.ByteString getToken();
+  }
+  /**
+   * Protobuf type {@code LlapOutputSocketInitMessage}
+   *
+   * <pre>
+   * The message sent by external client to claim the output from the output 
socket.
+   * </pre>
+   */
+  public static final class LlapOutputSocketInitMessage extends
+      com.google.protobuf.GeneratedMessage
+      implements LlapOutputSocketInitMessageOrBuilder {
+    // Use LlapOutputSocketInitMessage.newBuilder() to construct.
+    private 
LlapOutputSocketInitMessage(com.google.protobuf.GeneratedMessage.Builder<?> 
builder) {
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private LlapOutputSocketInitMessage(boolean noInit) { this.unknownFields = 
com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final LlapOutputSocketInitMessage defaultInstance;
+    public static LlapOutputSocketInitMessage getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public LlapOutputSocketInitMessage getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private LlapOutputSocketInitMessage(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              fragmentId_ = input.readBytes();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              token_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.class,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<LlapOutputSocketInitMessage> 
PARSER =
+        new com.google.protobuf.AbstractParser<LlapOutputSocketInitMessage>() {
+      public LlapOutputSocketInitMessage parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new LlapOutputSocketInitMessage(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<LlapOutputSocketInitMessage> 
getParserForType() {
+      return PARSER;
+    }
+
+    private int bitField0_;
+    // required string fragment_id = 1;
+    public static final int FRAGMENT_ID_FIELD_NUMBER = 1;
+    private java.lang.Object fragmentId_;
+    /**
+     * <code>required string fragment_id = 1;</code>
+     */
+    public boolean hasFragmentId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>required string fragment_id = 1;</code>
+     */
+    public java.lang.String getFragmentId() {
+      java.lang.Object ref = fragmentId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          fragmentId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>required string fragment_id = 1;</code>
+     */
+    public com.google.protobuf.ByteString
+        getFragmentIdBytes() {
+      java.lang.Object ref = fragmentId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        fragmentId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional bytes token = 2;
+    public static final int TOKEN_FIELD_NUMBER = 2;
+    private com.google.protobuf.ByteString token_;
+    /**
+     * <code>optional bytes token = 2;</code>
+     */
+    public boolean hasToken() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional bytes token = 2;</code>
+     */
+    public com.google.protobuf.ByteString getToken() {
+      return token_;
+    }
+
+    private void initFields() {
+      fragmentId_ = "";
+      token_ = com.google.protobuf.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      if (!hasFragmentId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getFragmentIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, token_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getFragmentIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, token_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage))
 {
+        return super.equals(obj);
+      }
+      
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 other = 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage)
 obj;
+
+      boolean result = true;
+      result = result && (hasFragmentId() == other.hasFragmentId());
+      if (hasFragmentId()) {
+        result = result && getFragmentId()
+            .equals(other.getFragmentId());
+      }
+      result = result && (hasToken() == other.hasToken());
+      if (hasToken()) {
+        result = result && getToken()
+            .equals(other.getToken());
+      }
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasFragmentId()) {
+        hash = (37 * hash) + FRAGMENT_ID_FIELD_NUMBER;
+        hash = (53 * hash) + getFragmentId().hashCode();
+      }
+      if (hasToken()) {
+        hash = (37 * hash) + TOKEN_FIELD_NUMBER;
+        hash = (53 * hash) + getToken().hashCode();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code LlapOutputSocketInitMessage}
+     *
+     * <pre>
+     * The message sent by external client to claim the output from the output 
socket.
+     * </pre>
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessageOrBuilder
 {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.class,
 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.Builder.class);
+      }
+
+      // Construct using 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        fragmentId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        token_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_descriptor;
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 getDefaultInstanceForType() {
+        return 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.getDefaultInstance();
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 build() {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 buildPartial() {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 result = new 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.fragmentId_ = fragmentId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.token_ = token_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage)
 {
+          return 
mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder 
mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 other) {
+        if (other == 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.getDefaultInstance())
 return this;
+        if (other.hasFragmentId()) {
+          bitField0_ |= 0x00000001;
+          fragmentId_ = other.fragmentId_;
+          onChanged();
+        }
+        if (other.hasToken()) {
+          setToken(other.getToken());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        if (!hasFragmentId()) {
+          
+          return false;
+        }
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage
 parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = 
(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage)
 e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // required string fragment_id = 1;
+      private java.lang.Object fragmentId_ = "";
+      /**
+       * <code>required string fragment_id = 1;</code>
+       */
+      public boolean hasFragmentId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>required string fragment_id = 1;</code>
+       */
+      public java.lang.String getFragmentId() {
+        java.lang.Object ref = fragmentId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          fragmentId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>required string fragment_id = 1;</code>
+       */
+      public com.google.protobuf.ByteString
+          getFragmentIdBytes() {
+        java.lang.Object ref = fragmentId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          fragmentId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>required string fragment_id = 1;</code>
+       */
+      public Builder setFragmentId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        fragmentId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string fragment_id = 1;</code>
+       */
+      public Builder clearFragmentId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        fragmentId_ = getDefaultInstance().getFragmentId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>required string fragment_id = 1;</code>
+       */
+      public Builder setFragmentIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        fragmentId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional bytes token = 2;
+      private com.google.protobuf.ByteString token_ = 
com.google.protobuf.ByteString.EMPTY;
+      /**
+       * <code>optional bytes token = 2;</code>
+       */
+      public boolean hasToken() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional bytes token = 2;</code>
+       */
+      public com.google.protobuf.ByteString getToken() {
+        return token_;
+      }
+      /**
+       * <code>optional bytes token = 2;</code>
+       */
+      public Builder setToken(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        token_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bytes token = 2;</code>
+       */
+      public Builder clearToken() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        token_ = getDefaultInstance().getToken();
+        onChanged();
+        return this;
+      }
+
+      // @@protoc_insertion_point(builder_scope:LlapOutputSocketInitMessage)
+    }
+
+    static {
+      defaultInstance = new LlapOutputSocketInitMessage(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:LlapOutputSocketInitMessage)
+  }
+
   /**
    * Protobuf service {@code LlapDaemonProtocol}
    */
@@ -17218,6 +17836,11 @@ public final class LlapDaemonProtocolProtos {
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_GetTokenResponseProto_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_LlapOutputSocketInitMessage_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_LlapOutputSocketInitMessage_fieldAccessorTable;
 
   public static com.google.protobuf.Descriptors.FileDescriptor
       getDescriptor() {
@@ -17286,23 +17909,25 @@ public final class LlapDaemonProtocolProtos {
       "Proto\022\"\n\032fragment_identifier_string\030\002 \001(" +
       "\t\" \n\036TerminateFragmentResponseProto\"&\n\024G" +
       "etTokenRequestProto\022\016\n\006app_id\030\001 \001(\t\"&\n\025G" +
-      "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S",
-      "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" +
-      "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" +
-      
"TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316"
 +
-      "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" +
-      "bmitWorkRequestProto\032\030.SubmitWorkRespons" +
-      "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" +
-      "teUpdatedRequestProto\032 .SourceStateUpdat" +
-      "edResponseProto\022H\n\rqueryComplete\022\032.Query" +
-      "CompleteRequestProto\032\033.QueryCompleteResp" +
-      "onseProto\022T\n\021terminateFragment\022\036.Termina",
-      "teFragmentRequestProto\032\037.TerminateFragme" +
-      "ntResponseProto2]\n\026LlapManagementProtoco" +
-      "l\022C\n\022getDelegationToken\022\025.GetTokenReques" +
-      "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" +
-      "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" +
-      "emonProtocolProtos\210\001\001\240\001\001"
+      "etTokenResponseProto\022\r\n\005token\030\001 \001(\014\"A\n\033L",
+      "lapOutputSocketInitMessage\022\023\n\013fragment_i" +
+      "d\030\001 \002(\t\022\r\n\005token\030\002 
\001(\014*2\n\020SourceStatePro" +
+      
"to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Su" +
+      "bmissionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJE" +
+      "CTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemon" 
+
+      "Protocol\022?\n\nsubmitWork\022\027.SubmitWorkReque" +
+      "stProto\032\030.SubmitWorkResponseProto\022W\n\022sou" +
+      "rceStateUpdated\022\037.SourceStateUpdatedRequ" +
+      "estProto\032 .SourceStateUpdatedResponsePro" +
+      "to\022H\n\rqueryComplete\022\032.QueryCompleteReque",
+      "stProto\032\033.QueryCompleteResponseProto\022T\n\021" +
+      "terminateFragment\022\036.TerminateFragmentReq" +
+      "uestProto\032\037.TerminateFragmentResponsePro" +
+      "to2]\n\026LlapManagementProtocol\022C\n\022getDeleg" +
+      "ationToken\022\025.GetTokenRequestProto\032\026.GetT" +
+      "okenResponseProtoBH\n&org.apache.hadoop.h" +
+      "ive.llap.daemon.rpcB\030LlapDaemonProtocolP" +
+      "rotos\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -17429,6 +18054,12 @@ public final class LlapDaemonProtocolProtos {
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_GetTokenResponseProto_descriptor,
               new java.lang.String[] { "Token", });
+          internal_static_LlapOutputSocketInitMessage_descriptor =
+            getDescriptor().getMessageTypes().get(20);
+          internal_static_LlapOutputSocketInitMessage_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_LlapOutputSocketInitMessage_descriptor,
+              new java.lang.String[] { "FragmentId", "Token", });
           return null;
         }
       };

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index 58a8e96..f06e963 100644
--- 
a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ 
b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -44,6 +44,8 @@ import org.apache.zookeeper.data.Id;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.protobuf.ByteString;
+
 public class SecretManager extends 
ZKDelegationTokenSecretManager<LlapTokenIdentifier>
   implements SigningSecretManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(SecretManager.class);
@@ -272,4 +274,13 @@ public class SecretManager extends 
ZKDelegationTokenSecretManager<LlapTokenIdent
       }
     }
   }
+
+  /** Verifies the token available as serialized bytes. */
+  public void verifyToken(byte[] tokenBytes) throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) return;
+    if (tokenBytes == null) throw new SecurityException("Token required for 
authentication");
+    Token<LlapTokenIdentifier> token = new Token<>();
+    token.readFields(new DataInputStream(new 
ByteArrayInputStream(tokenBytes)));
+    verifyToken(token.decodeIdentifier(), token.getPassword());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
----------------------------------------------------------------------
diff --git 
a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java 
b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
index dad5e07..01dc2e1 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskContext;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
@@ -103,6 +104,18 @@ public class Converters {
             attemptNum);
   }
 
+  public static TezTaskAttemptID createTaskAttemptId(TaskContext ctx) {
+    // Come ride the API roller-coaster #2! The best part is that ctx has 
TezTaskAttemptID inside.
+    return TezTaskAttemptID.getInstance(
+            TezTaskID.getInstance(
+                TezVertexID.getInstance(
+                    TezDAGID.getInstance(
+                        ctx.getApplicationId(),
+                    ctx.getDagIdentifier()),
+                ctx.getTaskVertexIndex()),
+            ctx.getTaskIndex()),
+        ctx.getTaskAttemptNumber());
+  }
   public static VertexIdentifier createVertexIdentifier(
       TezTaskAttemptID taId, int appAttemptId) {
     VertexIdentifier.Builder idBuilder = VertexIdentifier.newBuilder();

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-common/src/protobuf/LlapDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto 
b/llap-common/src/protobuf/LlapDaemonProtocol.proto
index 45d1808..92dda21 100644
--- a/llap-common/src/protobuf/LlapDaemonProtocol.proto
+++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto
@@ -180,6 +180,12 @@ message GetTokenResponseProto {
   optional bytes token = 1;
 }
 
+// The message sent by external client to claim the output from the output 
socket.
+message LlapOutputSocketInitMessage {
+  required string fragment_id = 1;
+  optional bytes token = 2;
+}
+
 service LlapDaemonProtocol {
   rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto);
   rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns 
(SourceStateUpdatedResponseProto);

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 03b2a54..4d17080 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -16,10 +16,10 @@
  */
 package org.apache.hadoop.hive.llap;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -37,6 +37,7 @@ import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary;
@@ -48,7 +49,6 @@ import 
org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -70,13 +70,11 @@ import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
 import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 
 
@@ -114,6 +112,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
   public LlapBaseInputFormat() {}
 
 
+  @SuppressWarnings("unchecked")
   @Override
   public RecordReader<NullWritable, V> getRecordReader(InputSplit split, 
JobConf job, Reporter reporter) throws IOException {
 
@@ -148,24 +147,28 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     llapClient.init(job);
     llapClient.start();
 
-    SubmitWorkRequestProto submitWorkRequestProto = 
constructSubmitWorkRequestProto(
+    SubmitWorkRequestProto request = constructSubmitWorkRequestProto(
         submitWorkInfo, llapSplit.getSplitNum(), llapClient.getAddress(),
         submitWorkInfo.getToken(), llapSplit.getFragmentBytes(),
         llapSplit.getFragmentBytesSignature());
-    llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort);
+    llapClient.submitWork(request, host, llapSubmitPort);
 
-    String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + 
llapSplit.getSplitNum();
-
-    // TODO: security for output channel
     Socket socket = new Socket(host, serviceInstance.getOutputFormatPort());
 
     LOG.debug("Socket connected");
+    SignableVertexSpec vertex = 
SignableVertexSpec.parseFrom(submitWorkInfo.getVertexBinary());
+    String fragmentId = 
Converters.createTaskAttemptId(vertex.getVertexIdentifier(),
+        request.getFragmentNumber(), request.getAttemptNumber()).toString();
+    OutputStream socketStream = socket.getOutputStream();
+    LlapOutputSocketInitMessage.Builder builder =
+        LlapOutputSocketInitMessage.newBuilder().setFragmentId(fragmentId);
+    if (llapSplit.getTokenBytes() != null) {
+      builder.setToken(ByteString.copyFrom(llapSplit.getTokenBytes()));
+    }
+    builder.build().writeDelimitedTo(socketStream);
+    socketStream.flush();
 
-    socket.getOutputStream().write(id.getBytes());
-    socket.getOutputStream().write(0);
-    socket.getOutputStream().flush();
-
-    LOG.info("Registered id: " + id);
+    LOG.info("Registered id: " + fragmentId);
 
     @SuppressWarnings("rawtypes")
     LlapBaseRecordReader recordReader = new LlapBaseRecordReader(
@@ -291,7 +294,6 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     String user = System.getenv(ApplicationConstants.Environment.USER.name());
     LOG.info("Setting user in submitWorkRequest to: " + user);
 
-    // TODO: this is bogus. What does LLAP use this for?
     ContainerId containerId =
         ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), 
taskNum);
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index c1ef0f4..c7e9d32 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem;
 import org.apache.hadoop.hive.llap.metrics.MetricsUtils;
 import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
 import org.apache.hadoop.hive.llap.security.LlapUgiFactoryFactory;
+import org.apache.hadoop.hive.llap.security.SecretManager;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.UDF;
@@ -88,6 +89,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
   public static final String LLAP_HADOOP_METRICS2_PROPERTIES_FILE = 
"hadoop-metrics2-llapdaemon.properties";
   public static final String HADOOP_METRICS2_PROPERTIES_FILE = 
"hadoop-metrics2.properties";
   private final Configuration shuffleHandlerConf;
+  private final SecretManager secretManager;
   private final LlapProtocolServerImpl server;
   private final ContainerRunnerImpl containerRunner;
   private final AMReporter amReporter;
@@ -245,7 +247,12 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
 
     this.amReporter = new AMReporter(srvAddress, new 
QueryFailedHandlerProxy(), daemonConf);
 
-    this.server = new LlapProtocolServerImpl(
+    SecretManager sm = null;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      sm = SecretManager.createSecretManager(daemonConf, 
daemonId.getClusterString());
+    }
+    this.secretManager = sm;
+    this.server = new LlapProtocolServerImpl(secretManager,
         numHandlers, this, srvAddress, mngAddress, srvPort, mngPort, daemonId);
 
     UgiFactory fsUgiFactory = null;
@@ -348,7 +355,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
     this.shufflePort.set(ShuffleHandler.get().getPort());
     getConfig()
         .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, 
ShuffleHandler.get().getPort());
-    LlapOutputFormatService.initializeAndStart(getConfig());
+    LlapOutputFormatService.initializeAndStart(getConfig(), secretManager);
     super.serviceStart();
 
     // Setup the actual ports in the configuration.

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
index 7ccd28f..eb7a8eb 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java
@@ -72,18 +72,19 @@ public class LlapProtocolServerImpl extends AbstractService
   private final int srvPort, mngPort;
   private RPC.Server server, mngServer;
   private final AtomicReference<InetSocketAddress> srvAddress, mngAddress;
-  private SecretManager zkSecretManager;
+  private final SecretManager secretManager;
   private String clusterUser = null;
   private boolean isRestrictedToClusterUser = false;
   private final DaemonId daemonId;
   private TokenRequiresSigning isSigningRequiredConfig = 
TokenRequiresSigning.TRUE;
 
-  public LlapProtocolServerImpl(int numHandlers, ContainerRunner 
containerRunner,
-      AtomicReference<InetSocketAddress> srvAddress, 
AtomicReference<InetSocketAddress> mngAddress,
-      int srvPort, int mngPort, DaemonId daemonId) {
+  public LlapProtocolServerImpl(SecretManager secretManager, int numHandlers,
+      ContainerRunner containerRunner, AtomicReference<InetSocketAddress> 
srvAddress,
+      AtomicReference<InetSocketAddress> mngAddress, int srvPort, int mngPort, 
DaemonId daemonId) {
     super("LlapDaemonProtocolServerImpl");
     this.numHandlers = numHandlers;
     this.containerRunner = containerRunner;
+    this.secretManager = secretManager;
     this.srvAddress = srvAddress;
     this.srvPort = srvPort;
     this.mngAddress = mngAddress;
@@ -156,8 +157,6 @@ public class LlapProtocolServerImpl extends AbstractService
     }
     String llapPrincipal = HiveConf.getVar(conf, 
ConfVars.LLAP_KERBEROS_PRINCIPAL),
         llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
-    zkSecretManager = SecretManager.createSecretManager(
-        conf, llapPrincipal, llapKeytab, daemonId.getClusterString());
 
     // Start the protocol server after properly authenticating with daemon 
keytab.
     UserGroupInformation daemonUgi = null;
@@ -269,8 +268,8 @@ public class LlapProtocolServerImpl extends AbstractService
         .setBindAddress(addr.getHostName())
         .setPort(addr.getPort())
         .setNumHandlers(numHandlers);
-    if (zkSecretManager != null) {
-      builder = builder.setSecretManager(zkSecretManager);
+    if (secretManager != null) {
+      builder = builder.setSecretManager(secretManager);
     }
     RPC.Server server = builder.build();
     if (isSecurityEnabled) {
@@ -283,7 +282,7 @@ public class LlapProtocolServerImpl extends AbstractService
   @Override
   public GetTokenResponseProto getDelegationToken(RpcController controller,
       GetTokenRequestProto request) throws ServiceException {
-    if (zkSecretManager == null) {
+    if (secretManager == null) {
       throw new ServiceException("Operation not supported on unsecure 
cluster");
     }
     UserGroupInformation callingUser = null;
@@ -292,7 +291,7 @@ public class LlapProtocolServerImpl extends AbstractService
       callingUser = UserGroupInformation.getCurrentUser();
       // Determine if the user would need to sign fragments.
       boolean isSigningRequired = determineIfSigningIsRequired(callingUser);
-      token = zkSecretManager.createLlapToken(
+      token = secretManager.createLlapToken(
           request.hasAppId() ? request.getAppId() : null, null, 
isSigningRequired);
     } catch (IOException e) {
       throw new ServiceException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
index fd37a06..b38e9d6 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java
@@ -44,7 +44,7 @@ public class TestLlapDaemonProtocolServerImpl {
     int numHandlers = HiveConf.getIntVar(daemonConf, 
ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS);
     ContainerRunner containerRunnerMock = mock(ContainerRunner.class);
     LlapProtocolServerImpl server =
-        new LlapProtocolServerImpl(numHandlers, containerRunnerMock,
+        new LlapProtocolServerImpl(null, numHandlers, containerRunnerMock,
            new AtomicReference<InetSocketAddress>(), new 
AtomicReference<InetSocketAddress>(),
            rpcPort, rpcPort + 1, null);
     
when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn(

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java
index 8e98aba..beb0e2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java
@@ -34,9 +34,6 @@ import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 
 import com.google.common.base.Preconditions;
 
-/**
- *
- */
 public class LlapOutputFormat<K extends Writable, V extends Writable>
   implements OutputFormat<K, V> {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 06660b3..151a31f 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -19,32 +19,21 @@ package org.apache.hadoop.hive.llap;
 import java.util.Map;
 import java.util.HashMap;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage;
+import org.apache.hadoop.hive.llap.security.SecretManager;
 
 import com.google.common.base.Preconditions;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -58,15 +47,14 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.DelimiterBasedFrameDecoder;
-import io.netty.handler.codec.Delimiters;
-import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
 import io.netty.handler.codec.string.StringEncoder;
-import io.netty.util.concurrent.Future;
 
 
 /**
- * Responsible for sending back result set data to the connections made by 
external clients via the LLAP input format.
+ * Responsible for sending back result set data to the connections
+ * made by external clients via the LLAP input format.
  */
 public class LlapOutputFormatService {
 
@@ -76,24 +64,30 @@ public class LlapOutputFormatService {
   private static final AtomicBoolean initing = new AtomicBoolean(false);
   private static LlapOutputFormatService INSTANCE;
 
-  private final Map<String, RecordWriter> writers;
+  // TODO: the global lock might be to coarse here.
+  private final Object lock = new Object();
+  private final Map<String, RecordWriter<?,?>> writers = new HashMap<String, 
RecordWriter<?,?>>();
+  private final Map<String, String> errors = new HashMap<String, String>();
   private final Configuration conf;
   private static final int WAIT_TIME = 5;
-  private static final int MAX_QUERY_ID_LENGTH = 256;
 
   private EventLoopGroup eventLoopGroup;
   private ServerBootstrap serverBootstrap;
   private ChannelFuture listeningChannelFuture;
   private int port;
+  private final SecretManager sm;
+  private final long writerTimeoutMs;
 
-  private LlapOutputFormatService(Configuration conf) throws IOException {
-    writers = new HashMap<String, RecordWriter>();
+  private LlapOutputFormatService(Configuration conf, SecretManager sm) throws 
IOException {
+    this.sm = sm;
     this.conf = conf;
+    this.writerTimeoutMs = HiveConf.getTimeVar(
+        conf, ConfVars.LLAP_DAEMON_OUTPUT_STREAM_TIMEOUT, 
TimeUnit.MILLISECONDS);
   }
 
-  public static void initializeAndStart(Configuration conf) throws Exception {
+  public static void initializeAndStart(Configuration conf, SecretManager sm) 
throws Exception {
     if (!initing.getAndSet(true)) {
-      INSTANCE = new LlapOutputFormatService(conf);
+      INSTANCE = new LlapOutputFormatService(conf, sm);
       INSTANCE.start();
       started.set(true);
     }
@@ -136,50 +130,96 @@ public class LlapOutputFormatService {
       LOG.warn("LlapOutputFormatService does not appear to have a listening 
port to close.");
     }
 
-    Future terminationFuture = eventLoopGroup.shutdownGracefully(1, WAIT_TIME, 
TimeUnit.SECONDS);
-    terminationFuture.sync();
+    eventLoopGroup.shutdownGracefully(1, WAIT_TIME, TimeUnit.SECONDS).sync();
   }
 
+  @SuppressWarnings("unchecked")
   public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, 
InterruptedException {
-    RecordWriter writer = null;
-    synchronized(INSTANCE) {
+    RecordWriter<?, ?> writer = null;
+    synchronized (lock) {
+      long startTime = System.nanoTime();
+      boolean isFirst = true;
       while ((writer = writers.get(id)) == null) {
-        LOG.info("Waiting for writer for: "+id);
-        INSTANCE.wait();
+        String error = errors.remove(id);
+        if (error != null) {
+          throw new IOException(error);
+        }
+        if (isFirst) {
+          LOG.info("Waiting for writer for " + id);
+          isFirst = false;
+        }
+        if (((System.nanoTime() - startTime) / 1000000) > writerTimeoutMs) {
+          throw new IOException("The writer for " + id + " has timed out after 
"
+              + writerTimeoutMs + "ms");
+        }
+        lock.wait(writerTimeoutMs);
       }
     }
     LOG.info("Returning writer for: "+id);
-    return writer;
+    return (RecordWriter<K, V>) writer;
   }
 
   public int getPort() {
     return port;
   }
 
-  protected class LlapOutputFormatServiceHandler extends 
SimpleChannelInboundHandler<String> {
+  protected class LlapOutputFormatServiceHandler
+    extends SimpleChannelInboundHandler<LlapOutputSocketInitMessage> {
     private final int sendBufferSize;
     public LlapOutputFormatServiceHandler(final int sendBufferSize) {
       this.sendBufferSize = sendBufferSize;
     }
 
     @Override
-    public void channelRead0(ChannelHandlerContext ctx, String msg) {
-      String id = msg;
-      registerReader(ctx, id);
+    public void channelRead0(ChannelHandlerContext ctx, 
LlapOutputSocketInitMessage msg) {
+      String id = msg.getFragmentId();
+      byte[] tokenBytes = msg.hasToken() ? msg.getToken().toByteArray() : null;
+      try {
+        registerReader(ctx, id, tokenBytes);
+      } catch (Throwable t) {
+        // Make sure we fail the channel if something goes wrong.
+        // We internally handle all the "expected" exceptions, so log a lot of 
information here.
+        failChannel(ctx, id, StringUtils.stringifyException(t));
+      }
     }
 
-    private void registerReader(ChannelHandlerContext ctx, String id) {
-      synchronized(INSTANCE) {
-        LOG.debug("registering socket for: " + id);
-        OutputStream stream = new ChannelOutputStream(ctx, id, sendBufferSize);
-        LlapRecordWriter writer = new LlapRecordWriter(stream);
-        writers.put(id, writer);
-
-        // Add listener to handle any cleanup for when the connection is closed
-        ctx.channel().closeFuture().addListener(new 
LlapOutputFormatChannelCloseListener(id));
+    private void registerReader(ChannelHandlerContext ctx, String id, byte[] 
tokenBytes) {
+      if (sm != null) {
+        try {
+          sm.verifyToken(tokenBytes);
+        } catch (SecurityException | IOException ex) {
+          failChannel(ctx, id, ex.getMessage());
+          return;
+        }
+      }
+      LOG.debug("registering socket for: " + id);
+      @SuppressWarnings("rawtypes")
+      LlapRecordWriter writer = new LlapRecordWriter(
+          new ChannelOutputStream(ctx, id, sendBufferSize));
+      boolean isFailed = true;
+      synchronized (lock) {
+        if (!writers.containsKey(id)) {
+          isFailed = false;
+          writers.put(id, writer);
+          // Add listener to handle any cleanup for when the connection is 
closed
+          ctx.channel().closeFuture().addListener(new 
LlapOutputFormatChannelCloseListener(id));
+          lock.notifyAll();
+        }
+      }
+      if (isFailed) {
+        failChannel(ctx, id, "Writer already registered for " + id);
+      }
+    }
 
-        INSTANCE.notifyAll();
+    /** Do not call under lock. */
+    private void failChannel(ChannelHandlerContext ctx, String id, String 
error) {
+      // TODO: write error to the channel? there's no mechanism for that now.
+      ctx.close();
+      synchronized (lock) {
+        errors.put(id, error);
+        lock.notifyAll();
       }
+      LOG.error(error);
     }
   }
 
@@ -192,8 +232,7 @@ public class LlapOutputFormatService {
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-      RecordWriter writer = null;
-
+      RecordWriter<?, ?> writer = null;
       synchronized (INSTANCE) {
         writer = writers.remove(id);
       }
@@ -213,8 +252,8 @@ public class LlapOutputFormatService {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
       ch.pipeline().addLast(
-          new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, 
Delimiters.nulDelimiter()),
-          new StringDecoder(),
+          new ProtobufVarint32FrameDecoder(),
+          new 
ProtobufDecoder(LlapOutputSocketInitMessage.getDefaultInstance()),
           new StringEncoder(),
           new LlapOutputFormatServiceHandler(sendBufferSize));
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
index f4a9cac..cc2ab39 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.AbstractMapOperator;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.llap.tez.Converters;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.llap.LlapOutputFormat;
 import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
@@ -94,11 +95,9 @@ public class MapRecordProcessor extends RecordProcessor {
   public MapRecordProcessor(final JobConf jconf, final ProcessorContext 
context) throws Exception {
     super(jconf, context);
     String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID);
-    if (LlapProxy.isDaemon()) { // do not cache plan
-      String id = queryId + "_" + context.getTaskIndex();
-      l4j.info("LLAP_OF_ID: "+id);
-      jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
-      cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
+    if (LlapProxy.isDaemon()) {
+      cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); // do not 
cache plan
+      setLlapOfFragmentId(context);
     } else {
       cache = ObjectCacheFactory.getCache(jconf, queryId);
     }
@@ -108,6 +107,15 @@ public class MapRecordProcessor extends RecordProcessor {
     nRows = 0;
   }
 
+  private void setLlapOfFragmentId(final ProcessorContext context) {
+    // TODO: could we do this only if the OF is actually used?
+    String attemptId = Converters.createTaskAttemptId(context).toString();
+    if (l4j.isDebugEnabled()) {
+      l4j.debug("Setting the LLAP fragment ID for OF to " + attemptId);
+    }
+    jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, attemptId);
+  }
+
   @Override
   void init(MRTaskReporter mrReporter,
       Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 9c385d1..37ae668 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -93,6 +94,8 @@ public final class PlanUtils {
   public static enum ExpressionTypes {
     FIELD, JEXL
   };
+  public static final String LLAP_OUTPUT_FORMAT_KEY = "Llap";
+  private static final String LLAP_OF_SH_CLASS = 
"org.apache.hadoop.hive.llap.LlapStorageHandler";
 
   public static synchronized long getCountForMapJoinDumpFilePrefix() {
     return countForMapJoinDumpFilePrefix++;
@@ -230,7 +233,7 @@ public final class PlanUtils {
 
     return getTableDesc(serdeClass, separatorCode, columns, columnTypes,
         lastColumnTakesRestOfTheLine, useDelimitedJSON, "TextFile");
- }
+  }
 
   public static TableDesc getTableDesc(
       Class<? extends Deserializer> serdeClass, String separatorCode,
@@ -272,12 +275,10 @@ public final class PlanUtils {
       inputFormat = RCFileInputFormat.class;
       outputFormat = RCFileOutputFormat.class;
       assert serdeClass == ColumnarSerDe.class;
-    } else if ("Llap".equalsIgnoreCase(fileFormat)) {
+    } else if (LLAP_OUTPUT_FORMAT_KEY.equalsIgnoreCase(fileFormat)) {
       inputFormat = TextInputFormat.class;
       outputFormat = LlapOutputFormat.class;
-      properties.setProperty(
-          
org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
-          "org.apache.hadoop.hive.llap.LlapStorageHandler");
+      properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, 
LLAP_OF_SH_CLASS);
     } else { // use TextFile by default
       inputFormat = TextInputFormat.class;
       outputFormat = IgnoreKeyTextOutputFormat.class;

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
index e2f0e84..a2ad4f9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java
@@ -41,13 +41,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.FieldDesc;
 import org.apache.hadoop.hive.llap.LlapInputSplit;
-import org.apache.hadoop.hive.llap.LlapOutputFormat;
 import org.apache.hadoop.hive.llap.NotTezEventHelper;
-import org.apache.hadoop.hive.llap.SubmitWorkInfo;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.llap.Schema;
-import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.SubmitWorkInfo;
 import org.apache.hadoop.hive.llap.TypeDesc;
 import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec;
@@ -57,6 +55,7 @@ import 
org.apache.hadoop.hive.llap.security.LlapSigner.SignedMessage;
 import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
 import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient;
 import org.apache.hadoop.hive.llap.tez.Converters;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver;
@@ -71,6 +70,7 @@ import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -81,13 +81,12 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.SplitLocationInfo;
@@ -115,7 +114,7 @@ import com.google.common.base.Preconditions;
 
 /**
  * GenericUDTFGetSplits.
- *
+ * 
  */
 @Description(name = "get_splits", value = "_FUNC_(string,int) - "
     + "Returns an array of length int serialized splits for the referenced 
tables string.")
@@ -131,7 +130,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
   @Override
   public StructObjectInspector initialize(ObjectInspector[] arguments)
-    throws UDFArgumentException {
+      throws UDFArgumentException {
 
     LOG.debug("initializing GenericUDFGetSplits");
 
@@ -142,14 +141,15 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     LOG.debug("Initialized conf, jc and metastore connection");
 
     if (arguments.length != 2) {
-      throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 
arguments.");
+      throw new UDFArgumentLengthException(
+          "The function GET_SPLITS accepts 2 arguments.");
     } else if (!(arguments[0] instanceof StringObjectInspector)) {
-      LOG.error("Got "+arguments[0].getTypeName()+" instead of string.");
+      LOG.error("Got " + arguments[0].getTypeName() + " instead of string.");
       throw new UDFArgumentTypeException(0, "\""
           + "string\" is expected at function GET_SPLITS, " + "but \""
           + arguments[0].getTypeName() + "\" is found");
     } else if (!(arguments[1] instanceof IntObjectInspector)) {
-      LOG.error("Got "+arguments[1].getTypeName()+" instead of int.");
+      LOG.error("Got " + arguments[1].getTypeName() + " instead of int.");
       throw new UDFArgumentTypeException(1, "\""
           + "int\" is expected at function GET_SPLITS, " + "but \""
           + arguments[1].getTypeName() + "\" is found");
@@ -159,9 +159,10 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     intOI = (IntObjectInspector) arguments[1];
 
     List<String> names = Arrays.asList("split");
-    List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList(
-      PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
-    StructObjectInspector outputOI = 
ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs);
+    List<ObjectInspector> fieldOIs = Arrays
+        .<ObjectInspector> 
asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector);
+    StructObjectInspector outputOI = ObjectInspectorFactory
+        .getStandardStructObjectInspector(names, fieldOIs);
 
     LOG.debug("done initializing GenericUDFGetSplits");
     return outputOI;
@@ -190,7 +191,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     Schema schema = fragment.schema;
 
     try {
-      for (InputSplit s: getSplits(jc, num, tezWork, schema)) {
+      for (InputSplit s : getSplits(jc, num, tezWork, schema)) {
         Object[] os = new Object[1];
         bos.reset();
         s.write(dos);
@@ -198,24 +199,26 @@ public class GenericUDTFGetSplits extends GenericUDTF {
         os[0] = frozen;
         forward(os);
       }
-    } catch(Exception e) {
+    } catch (Exception e) {
       throw new HiveException(e);
     }
   }
 
-  public PlanFragment createPlanFragment(String query, int num) throws 
HiveException {
+  public PlanFragment createPlanFragment(String query, int num)
+      throws HiveException {
 
     HiveConf conf = new HiveConf(SessionState.get().getConf());
     HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none");
-    HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap");
+    HiveConf.setVar(conf, ConfVars.HIVEQUERYRESULTFILEFORMAT, 
PlanUtils.LLAP_OUTPUT_FORMAT_KEY);
 
-    String originalMode = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVE_EXECUTION_MODE);
-    HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap");
-    HiveConf.setBoolVar(conf, 
HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS, true);
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS, 
true);
+    String originalMode = HiveConf.getVar(conf,
+        ConfVars.HIVE_EXECUTION_MODE);
+    HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, "llap");
+    HiveConf.setBoolVar(conf, ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS, 
true);
+    HiveConf.setBoolVar(conf, ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS, true);
     conf.setBoolean(TezSplitGrouper.TEZ_GROUPING_NODE_LOCAL_ONLY, true);
     // Tez/LLAP requires RPC query plan
-    HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN, true);
+    HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
 
     try {
       jc = DagUtils.getInstance().createConfiguration(conf);
@@ -224,14 +227,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
     }
 
     Driver driver = new Driver(conf);
-    CommandProcessorResponse cpr;
-
-    LOG.info("setting fetch.task.conversion to none and query file format to 
\""
-        + LlapOutputFormat.class.getName()+"\"");
-
-    cpr = driver.compileAndRespond(query);
-    if(cpr.getResponseCode() != 0) {
-      throw new HiveException("Failed to compile query: "+cpr.getException());
+    CommandProcessorResponse cpr = driver.compileAndRespond(query);
+    if (cpr.getResponseCode() != 0) {
+      throw new HiveException("Failed to compile query: " + 
cpr.getException());
     }
 
     QueryPlan plan = driver.getPlan();
@@ -248,11 +246,11 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
       String tableName = 
"table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", "");
 
-      String ctas = "create temporary table "+tableName+" as "+query;
-      LOG.info("CTAS: "+ctas);
+      String ctas = "create temporary table " + tableName + " as " + query;
+      LOG.info("Materializing the query for LLAPIF; CTAS: " + ctas);
 
       try {
-        HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, 
originalMode);
+        HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, originalMode);
         cpr = driver.run(ctas, false);
       } catch (CommandNeedRetryException e) {
         throw new HiveException(e);
@@ -262,7 +260,7 @@ public class GenericUDTFGetSplits extends GenericUDTF {
         throw new HiveException("Failed to create temp table: " + 
cpr.getException());
       }
 
-      HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap");
+      HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, "llap");
       query = "select * from " + tableName;
       cpr = driver.compileAndRespond(query);
       if(cpr.getResponseCode() != 0) {
@@ -310,9 +308,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
       // we have the dag now proceed to get the splits:
       Preconditions.checkState(HiveConf.getBoolVar(wxConf,
-              HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
+              ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS));
       Preconditions.checkState(HiveConf.getBoolVar(wxConf,
-              HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
+              ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS));
       HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, 
mapWork);
       List<Event> eventList = splitGenerator.initialize();
 
@@ -457,9 +455,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
   }
 
   /**
-   * Returns a local resource representing a jar. This resource will be used 
to execute the plan on
-   * the cluster.
-   *
+   * Returns a local resource representing a jar. This resource will be used to
+   * execute the plan on the cluster.
+   * 
    * @param localJarPath
    *          Local path to the jar to be localized.
    * @return LocalResource corresponding to the localized hive exec resource.
@@ -470,9 +468,9 @@ public class GenericUDTFGetSplits extends GenericUDTF {
    * @throws URISyntaxException
    *           when current jar location cannot be determined.
    */
-  private LocalResource createJarLocalResource(String localJarPath, DagUtils 
utils,
-      Configuration conf)
-    throws IOException, LoginException, IllegalArgumentException, 
FileNotFoundException {
+  private LocalResource createJarLocalResource(String localJarPath,
+      DagUtils utils, Configuration conf) throws IOException, LoginException,
+      IllegalArgumentException, FileNotFoundException {
     FileStatus destDirStatus = utils.getHiveJarDirectory(conf);
     assert destDirStatus != null;
     Path destDirPath = destDirStatus.getPath();
@@ -482,19 +480,24 @@ public class GenericUDTFGetSplits extends GenericUDTF {
 
     String destFileName = localFile.getName();
 
-    // Now, try to find the file based on SHA and name. Currently we require 
exact name match.
-    // We could also allow cutting off versions and other stuff provided that 
SHA matches...
+    // Now, try to find the file based on SHA and name. Currently we require
+    // exact name match.
+    // We could also allow cutting off versions and other stuff provided that
+    // SHA matches...
     destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha
-      + FilenameUtils.EXTENSION_SEPARATOR + 
FilenameUtils.getExtension(destFileName);
+        + FilenameUtils.EXTENSION_SEPARATOR
+        + FilenameUtils.getExtension(destFileName);
 
-    // TODO: if this method is ever called on more than one jar, getting the 
dir and the
+    // TODO: if this method is ever called on more than one jar, getting the 
dir
+    // and the
     // list need to be refactored out to be done only once.
     Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
-    return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, 
conf);
+    return utils.localizeResource(localFile, destFile, LocalResourceType.FILE,
+        conf);
   }
 
-  private String getSha(Path localFile, Configuration conf)
-    throws IOException, IllegalArgumentException {
+  private String getSha(Path localFile, Configuration conf) throws IOException,
+      IllegalArgumentException {
     InputStream is = null;
     try {
       FileSystem localFs = FileSystem.getLocal(conf);
@@ -510,57 +513,60 @@ public class GenericUDTFGetSplits extends GenericUDTF {
   private TypeDesc convertTypeString(String typeString) throws HiveException {
     TypeDesc typeDesc;
     TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
-    Preconditions.checkState(typeInfo.getCategory() == 
ObjectInspector.Category.PRIMITIVE,
+    Preconditions.checkState(
+        typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE,
         "Unsupported non-primitive type " + typeString);
 
     switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
-      case BOOLEAN:
-        typeDesc = new TypeDesc(TypeDesc.Type.BOOLEAN);
-        break;
-      case BYTE:
-        typeDesc = new TypeDesc(TypeDesc.Type.TINYINT);
-        break;
-      case SHORT:
-        typeDesc = new TypeDesc(TypeDesc.Type.SMALLINT);
-        break;
-      case INT:
-        typeDesc = new TypeDesc(TypeDesc.Type.INT);
-        break;
-      case LONG:
-        typeDesc = new TypeDesc(TypeDesc.Type.BIGINT);
-        break;
-      case FLOAT:
-        typeDesc = new TypeDesc(TypeDesc.Type.FLOAT);
-        break;
-      case DOUBLE:
-        typeDesc = new TypeDesc(TypeDesc.Type.DOUBLE);
-        break;
-      case STRING:
-        typeDesc = new TypeDesc(TypeDesc.Type.STRING);
-        break;
-      case CHAR:
-        CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo;
-        typeDesc = new TypeDesc(TypeDesc.Type.CHAR, charTypeInfo.getLength());
-        break;
-      case VARCHAR:
-        VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo;
-        typeDesc = new TypeDesc(TypeDesc.Type.VARCHAR, 
varcharTypeInfo.getLength());
-        break;
-      case DATE:
-        typeDesc = new TypeDesc(TypeDesc.Type.DATE);
-        break;
-      case TIMESTAMP:
-        typeDesc = new TypeDesc(TypeDesc.Type.TIMESTAMP);
-        break;
-      case BINARY:
-        typeDesc = new TypeDesc(TypeDesc.Type.BINARY);
-        break;
-      case DECIMAL:
-        DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
-        typeDesc = new TypeDesc(TypeDesc.Type.DECIMAL, 
decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
-        break;
-      default:
-        throw new HiveException("Unsupported type " + typeString);
+    case BOOLEAN:
+      typeDesc = new TypeDesc(TypeDesc.Type.BOOLEAN);
+      break;
+    case BYTE:
+      typeDesc = new TypeDesc(TypeDesc.Type.TINYINT);
+      break;
+    case SHORT:
+      typeDesc = new TypeDesc(TypeDesc.Type.SMALLINT);
+      break;
+    case INT:
+      typeDesc = new TypeDesc(TypeDesc.Type.INT);
+      break;
+    case LONG:
+      typeDesc = new TypeDesc(TypeDesc.Type.BIGINT);
+      break;
+    case FLOAT:
+      typeDesc = new TypeDesc(TypeDesc.Type.FLOAT);
+      break;
+    case DOUBLE:
+      typeDesc = new TypeDesc(TypeDesc.Type.DOUBLE);
+      break;
+    case STRING:
+      typeDesc = new TypeDesc(TypeDesc.Type.STRING);
+      break;
+    case CHAR:
+      CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo;
+      typeDesc = new TypeDesc(TypeDesc.Type.CHAR, charTypeInfo.getLength());
+      break;
+    case VARCHAR:
+      VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo;
+      typeDesc = new TypeDesc(TypeDesc.Type.VARCHAR,
+          varcharTypeInfo.getLength());
+      break;
+    case DATE:
+      typeDesc = new TypeDesc(TypeDesc.Type.DATE);
+      break;
+    case TIMESTAMP:
+      typeDesc = new TypeDesc(TypeDesc.Type.TIMESTAMP);
+      break;
+    case BINARY:
+      typeDesc = new TypeDesc(TypeDesc.Type.BINARY);
+      break;
+    case DECIMAL:
+      DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+      typeDesc = new TypeDesc(TypeDesc.Type.DECIMAL,
+          decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
+      break;
+    default:
+      throw new HiveException("Unsupported type " + typeString);
     }
 
     return typeDesc;

http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java 
b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
index 1d592fb..577037c 100644
--- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java
@@ -27,35 +27,21 @@ import java.net.Socket;
 
 import java.io.OutputStream;
 import java.io.InputStream;
-import java.io.File;
 import java.io.IOException;
-import java.io.FileInputStream;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import junit.framework.TestCase;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent;
+import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 
-
 public class TestLlapOutputFormat {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestLlapOutputFormat.class);
@@ -68,7 +54,7 @@ public class TestLlapOutputFormat {
     Configuration conf = new Configuration();
     // Pick random avail port
     HiveConf.setIntVar(conf, 
HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, 0);
-    LlapOutputFormatService.initializeAndStart(conf);
+    LlapOutputFormatService.initializeAndStart(conf, null);
     service = LlapOutputFormatService.get();
     LlapProxy.setDaemon(true);
     LOG.debug("Output service up");
@@ -86,7 +72,7 @@ public class TestLlapOutputFormat {
     JobConf job = new JobConf();
 
     for (int k = 0; k < 5; ++k) {
-      String id = "foobar"+k;
+      String id = "foobar" + k;
       job.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
       LlapOutputFormat format = new LlapOutputFormat();
 
@@ -95,9 +81,10 @@ public class TestLlapOutputFormat {
 
       LOG.debug("Socket connected");
 
-      socket.getOutputStream().write(id.getBytes());
-      socket.getOutputStream().write(0);
-      socket.getOutputStream().flush();
+      OutputStream socketStream = socket.getOutputStream();
+      LlapOutputSocketInitMessage.newBuilder()
+        .setFragmentId(id).build().writeDelimitedTo(socketStream);
+      socketStream.flush();
 
       Thread.sleep(3000);
 
@@ -131,7 +118,38 @@ public class TestLlapOutputFormat {
 
       reader.close();
 
-      Assert.assertEquals(count,10);
+      Assert.assertEquals(10, count);
+    }
+  }
+
+
+  @Test
+  public void testBadClientMessage() throws Exception {
+    JobConf job = new JobConf();
+    String id = "foobar";
+    job.set(LlapOutputFormat.LLAP_OF_ID_KEY, id);
+    LlapOutputFormat format = new LlapOutputFormat();
+
+    Socket socket = new Socket("localhost", service.getPort());
+
+    LOG.debug("Socket connected");
+
+    OutputStream socketStream = socket.getOutputStream();
+    LlapOutputSocketInitMessage.newBuilder()
+      .setFragmentId(id).build().writeDelimitedTo(socketStream);
+    LlapOutputSocketInitMessage.newBuilder()
+      .setFragmentId(id).build().writeDelimitedTo(socketStream);
+    socketStream.flush();
+
+    Thread.sleep(3000);
+
+    LOG.debug("Data written");
+
+    try {
+      format.getRecordWriter(null, job, null, null);
+      Assert.fail("Didn't throw");
+    } catch (IOException ex) {
+      // Expected.
     }
   }
 }

Reply via email to