Repository: hive Updated Branches: refs/heads/master 59e6c83fd -> 0d69a88b9
HIVE-13827 : LLAPIF: authentication on the output channel (Sergey Shelukhin, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0d69a88b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0d69a88b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0d69a88b Branch: refs/heads/master Commit: 0d69a88b9f317e93194e08aee409f0b1b6ccab7c Parents: 59e6c83 Author: Sergey Shelukhin <ser...@apache.org> Authored: Mon Jun 13 18:57:35 2016 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Mon Jun 13 18:57:35 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../daemon/rpc/LlapDaemonProtocolProtos.java | 665 ++++++++++++++++++- .../hive/llap/security/SecretManager.java | 11 + .../apache/hadoop/hive/llap/tez/Converters.java | 13 + .../src/protobuf/LlapDaemonProtocol.proto | 6 + .../hadoop/hive/llap/LlapBaseInputFormat.java | 32 +- .../hive/llap/daemon/impl/LlapDaemon.java | 11 +- .../daemon/impl/LlapProtocolServerImpl.java | 19 +- .../impl/TestLlapDaemonProtocolServerImpl.java | 2 +- .../hadoop/hive/llap/LlapOutputFormat.java | 3 - .../hive/llap/LlapOutputFormatService.java | 141 ++-- .../hive/ql/exec/tez/MapRecordProcessor.java | 18 +- .../apache/hadoop/hive/ql/plan/PlanUtils.java | 11 +- .../ql/udf/generic/GenericUDTFGetSplits.java | 204 +++--- .../hadoop/hive/llap/TestLlapOutputFormat.java | 60 +- 15 files changed, 971 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 285caa3..761dbb2 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2883,6 +2883,10 @@ public class HiveConf extends Configuration { "protocol or ZK paths), similar to how ssh refuses a key with bad access permissions."), LLAP_DAEMON_OUTPUT_SERVICE_PORT("hive.llap.daemon.output.service.port", 15003, "LLAP daemon output service port"), + LLAP_DAEMON_OUTPUT_STREAM_TIMEOUT("hive.llap.daemon.output.stream.timeout", "120s", + new TimeValidator(TimeUnit.SECONDS), + "The timeout for the client to connect to LLAP output service and start the fragment\n" + + "output after sending the fragment. The fragment will fail if its output is not claimed."), LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE("hive.llap.daemon.output.service.send.buffer.size", 128 * 1024, "Send buffer size to be used by LLAP daemon output service"), LLAP_ENABLE_GRACE_JOIN_IN_LLAP("hive.llap.enable.grace.join.in.llap", false, http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java ---------------------------------------------------------------------- diff --git a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java index 856ea30..56a1361 100644 --- a/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java +++ b/llap-common/src/gen/protobuf/gen-java/org/apache/hadoop/hive/llap/daemon/rpc/LlapDaemonProtocolProtos.java @@ -16441,6 +16441,624 @@ public final class LlapDaemonProtocolProtos { // @@protoc_insertion_point(class_scope:GetTokenResponseProto) } + public interface LlapOutputSocketInitMessageOrBuilder + extends com.google.protobuf.MessageOrBuilder { + + // required string fragment_id = 1; + /** + * <code>required string fragment_id = 1;</code> + */ + boolean hasFragmentId(); + /** + * <code>required string fragment_id = 1;</code> + */ + java.lang.String getFragmentId(); + /** + * <code>required string fragment_id = 1;</code> + */ + com.google.protobuf.ByteString + getFragmentIdBytes(); + + // optional bytes token = 2; + /** + * <code>optional bytes token = 2;</code> + */ + boolean hasToken(); + /** + * <code>optional bytes token = 2;</code> + */ + com.google.protobuf.ByteString getToken(); + } + /** + * Protobuf type {@code LlapOutputSocketInitMessage} + * + * <pre> + * The message sent by external client to claim the output from the output socket. + * </pre> + */ + public static final class LlapOutputSocketInitMessage extends + com.google.protobuf.GeneratedMessage + implements LlapOutputSocketInitMessageOrBuilder { + // Use LlapOutputSocketInitMessage.newBuilder() to construct. + private LlapOutputSocketInitMessage(com.google.protobuf.GeneratedMessage.Builder<?> builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private LlapOutputSocketInitMessage(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final LlapOutputSocketInitMessage defaultInstance; + public static LlapOutputSocketInitMessage getDefaultInstance() { + return defaultInstance; + } + + public LlapOutputSocketInitMessage getDefaultInstanceForType() { + return defaultInstance; + } + + private final com.google.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private LlapOutputSocketInitMessage( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + fragmentId_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + token_ = input.readBytes(); + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.Builder.class); + } + + public static com.google.protobuf.Parser<LlapOutputSocketInitMessage> PARSER = + new com.google.protobuf.AbstractParser<LlapOutputSocketInitMessage>() { + public LlapOutputSocketInitMessage parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new LlapOutputSocketInitMessage(input, extensionRegistry); + } + }; + + @java.lang.Override + public com.google.protobuf.Parser<LlapOutputSocketInitMessage> getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string fragment_id = 1; + public static final int FRAGMENT_ID_FIELD_NUMBER = 1; + private java.lang.Object fragmentId_; + /** + * <code>required string fragment_id = 1;</code> + */ + public boolean hasFragmentId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>required string fragment_id = 1;</code> + */ + public java.lang.String getFragmentId() { + java.lang.Object ref = fragmentId_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + com.google.protobuf.ByteString bs = + (com.google.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + fragmentId_ = s; + } + return s; + } + } + /** + * <code>required string fragment_id = 1;</code> + */ + public com.google.protobuf.ByteString + getFragmentIdBytes() { + java.lang.Object ref = fragmentId_; + if (ref instanceof java.lang.String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + fragmentId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + + // optional bytes token = 2; + public static final int TOKEN_FIELD_NUMBER = 2; + private com.google.protobuf.ByteString token_; + /** + * <code>optional bytes token = 2;</code> + */ + public boolean hasToken() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * <code>optional bytes token = 2;</code> + */ + public com.google.protobuf.ByteString getToken() { + return token_; + } + + private void initFields() { + fragmentId_ = ""; + token_ = com.google.protobuf.ByteString.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasFragmentId()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getFragmentIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, token_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(1, getFragmentIdBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, token_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage)) { + return super.equals(obj); + } + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage other = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage) obj; + + boolean result = true; + result = result && (hasFragmentId() == other.hasFragmentId()); + if (hasFragmentId()) { + result = result && getFragmentId() + .equals(other.getFragmentId()); + } + result = result && (hasToken() == other.hasToken()); + if (hasToken()) { + result = result && getToken() + .equals(other.getToken()); + } + result = result && + getUnknownFields().equals(other.getUnknownFields()); + return result; + } + + private int memoizedHashCode = 0; + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptorForType().hashCode(); + if (hasFragmentId()) { + hash = (37 * hash) + FRAGMENT_ID_FIELD_NUMBER; + hash = (53 * hash) + getFragmentId().hashCode(); + } + if (hasToken()) { + hash = (37 * hash) + TOKEN_FIELD_NUMBER; + hash = (53 * hash) + getToken().hashCode(); + } + hash = (29 * hash) + getUnknownFields().hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code LlapOutputSocketInitMessage} + * + * <pre> + * The message sent by external client to claim the output from the output socket. + * </pre> + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder<Builder> + implements org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessageOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.class, org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.Builder.class); + } + + // Construct using org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + fragmentId_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + token_ = com.google.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.internal_static_LlapOutputSocketInitMessage_descriptor; + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage getDefaultInstanceForType() { + return org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.getDefaultInstance(); + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage build() { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage buildPartial() { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage result = new org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.fragmentId_ = fragmentId_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.token_ = token_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage) { + return mergeFrom((org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage other) { + if (other == org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.getDefaultInstance()) return this; + if (other.hasFragmentId()) { + bitField0_ |= 0x00000001; + fragmentId_ = other.fragmentId_; + onChanged(); + } + if (other.hasToken()) { + setToken(other.getToken()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasFragmentId()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string fragment_id = 1; + private java.lang.Object fragmentId_ = ""; + /** + * <code>required string fragment_id = 1;</code> + */ + public boolean hasFragmentId() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <code>required string fragment_id = 1;</code> + */ + public java.lang.String getFragmentId() { + java.lang.Object ref = fragmentId_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((com.google.protobuf.ByteString) ref) + .toStringUtf8(); + fragmentId_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * <code>required string fragment_id = 1;</code> + */ + public com.google.protobuf.ByteString + getFragmentIdBytes() { + java.lang.Object ref = fragmentId_; + if (ref instanceof String) { + com.google.protobuf.ByteString b = + com.google.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + fragmentId_ = b; + return b; + } else { + return (com.google.protobuf.ByteString) ref; + } + } + /** + * <code>required string fragment_id = 1;</code> + */ + public Builder setFragmentId( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + fragmentId_ = value; + onChanged(); + return this; + } + /** + * <code>required string fragment_id = 1;</code> + */ + public Builder clearFragmentId() { + bitField0_ = (bitField0_ & ~0x00000001); + fragmentId_ = getDefaultInstance().getFragmentId(); + onChanged(); + return this; + } + /** + * <code>required string fragment_id = 1;</code> + */ + public Builder setFragmentIdBytes( + com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + fragmentId_ = value; + onChanged(); + return this; + } + + // optional bytes token = 2; + private com.google.protobuf.ByteString token_ = com.google.protobuf.ByteString.EMPTY; + /** + * <code>optional bytes token = 2;</code> + */ + public boolean hasToken() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * <code>optional bytes token = 2;</code> + */ + public com.google.protobuf.ByteString getToken() { + return token_; + } + /** + * <code>optional bytes token = 2;</code> + */ + public Builder setToken(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + token_ = value; + onChanged(); + return this; + } + /** + * <code>optional bytes token = 2;</code> + */ + public Builder clearToken() { + bitField0_ = (bitField0_ & ~0x00000002); + token_ = getDefaultInstance().getToken(); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:LlapOutputSocketInitMessage) + } + + static { + defaultInstance = new LlapOutputSocketInitMessage(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:LlapOutputSocketInitMessage) + } + /** * Protobuf service {@code LlapDaemonProtocol} */ @@ -17218,6 +17836,11 @@ public final class LlapDaemonProtocolProtos { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_GetTokenResponseProto_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_LlapOutputSocketInitMessage_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_LlapOutputSocketInitMessage_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -17286,23 +17909,25 @@ public final class LlapDaemonProtocolProtos { "Proto\022\"\n\032fragment_identifier_string\030\002 \001(" + "\t\" \n\036TerminateFragmentResponseProto\"&\n\024G" + "etTokenRequestProto\022\016\n\006app_id\030\001 \001(\t\"&\n\025G" + - "etTokenResponseProto\022\r\n\005token\030\001 \001(\014*2\n\020S", - "ourceStateProto\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RU" + - "NNING\020\002*E\n\024SubmissionStateProto\022\014\n\010ACCEP" + - "TED\020\001\022\014\n\010REJECTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316" + - "\002\n\022LlapDaemonProtocol\022?\n\nsubmitWork\022\027.Su" + - "bmitWorkRequestProto\032\030.SubmitWorkRespons" + - "eProto\022W\n\022sourceStateUpdated\022\037.SourceSta" + - "teUpdatedRequestProto\032 .SourceStateUpdat" + - "edResponseProto\022H\n\rqueryComplete\022\032.Query" + - "CompleteRequestProto\032\033.QueryCompleteResp" + - "onseProto\022T\n\021terminateFragment\022\036.Termina", - "teFragmentRequestProto\032\037.TerminateFragme" + - "ntResponseProto2]\n\026LlapManagementProtoco" + - "l\022C\n\022getDelegationToken\022\025.GetTokenReques" + - "tProto\032\026.GetTokenResponseProtoBH\n&org.ap" + - "ache.hadoop.hive.llap.daemon.rpcB\030LlapDa" + - "emonProtocolProtos\210\001\001\240\001\001" + "etTokenResponseProto\022\r\n\005token\030\001 \001(\014\"A\n\033L", + "lapOutputSocketInitMessage\022\023\n\013fragment_i" + + "d\030\001 \002(\t\022\r\n\005token\030\002 \001(\014*2\n\020SourceStatePro" + + "to\022\017\n\013S_SUCCEEDED\020\001\022\r\n\tS_RUNNING\020\002*E\n\024Su" + + "bmissionStateProto\022\014\n\010ACCEPTED\020\001\022\014\n\010REJE" + + "CTED\020\002\022\021\n\rEVICTED_OTHER\020\0032\316\002\n\022LlapDaemon" + + "Protocol\022?\n\nsubmitWork\022\027.SubmitWorkReque" + + "stProto\032\030.SubmitWorkResponseProto\022W\n\022sou" + + "rceStateUpdated\022\037.SourceStateUpdatedRequ" + + "estProto\032 .SourceStateUpdatedResponsePro" + + "to\022H\n\rqueryComplete\022\032.QueryCompleteReque", + "stProto\032\033.QueryCompleteResponseProto\022T\n\021" + + "terminateFragment\022\036.TerminateFragmentReq" + + "uestProto\032\037.TerminateFragmentResponsePro" + + "to2]\n\026LlapManagementProtocol\022C\n\022getDeleg" + + "ationToken\022\025.GetTokenRequestProto\032\026.GetT" + + "okenResponseProtoBH\n&org.apache.hadoop.h" + + "ive.llap.daemon.rpcB\030LlapDaemonProtocolP" + + "rotos\210\001\001\240\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -17429,6 +18054,12 @@ public final class LlapDaemonProtocolProtos { com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_GetTokenResponseProto_descriptor, new java.lang.String[] { "Token", }); + internal_static_LlapOutputSocketInitMessage_descriptor = + getDescriptor().getMessageTypes().get(20); + internal_static_LlapOutputSocketInitMessage_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_LlapOutputSocketInitMessage_descriptor, + new java.lang.String[] { "FragmentId", "Token", }); return null; } }; http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java index 58a8e96..f06e963 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java @@ -44,6 +44,8 @@ import org.apache.zookeeper.data.Id; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.protobuf.ByteString; + public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdentifier> implements SigningSecretManager { private static final Logger LOG = LoggerFactory.getLogger(SecretManager.class); @@ -272,4 +274,13 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent } } } + + /** Verifies the token available as serialized bytes. */ + public void verifyToken(byte[] tokenBytes) throws IOException { + if (!UserGroupInformation.isSecurityEnabled()) return; + if (tokenBytes == null) throw new SecurityException("Token required for authentication"); + Token<LlapTokenIdentifier> token = new Token<>(); + token.readFields(new DataInputStream(new ByteArrayInputStream(tokenBytes))); + verifyToken(token.decodeIdentifier(), token.getPassword()); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java ---------------------------------------------------------------------- diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java index dad5e07..01dc2e1 100644 --- a/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java +++ b/llap-common/src/java/org/apache/hadoop/hive/llap/tez/Converters.java @@ -39,6 +39,7 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.api.TaskContext; import org.apache.tez.runtime.api.impl.GroupInputSpec; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; @@ -103,6 +104,18 @@ public class Converters { attemptNum); } + public static TezTaskAttemptID createTaskAttemptId(TaskContext ctx) { + // Come ride the API roller-coaster #2! The best part is that ctx has TezTaskAttemptID inside. + return TezTaskAttemptID.getInstance( + TezTaskID.getInstance( + TezVertexID.getInstance( + TezDAGID.getInstance( + ctx.getApplicationId(), + ctx.getDagIdentifier()), + ctx.getTaskVertexIndex()), + ctx.getTaskIndex()), + ctx.getTaskAttemptNumber()); + } public static VertexIdentifier createVertexIdentifier( TezTaskAttemptID taId, int appAttemptId) { VertexIdentifier.Builder idBuilder = VertexIdentifier.newBuilder(); http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-common/src/protobuf/LlapDaemonProtocol.proto ---------------------------------------------------------------------- diff --git a/llap-common/src/protobuf/LlapDaemonProtocol.proto b/llap-common/src/protobuf/LlapDaemonProtocol.proto index 45d1808..92dda21 100644 --- a/llap-common/src/protobuf/LlapDaemonProtocol.proto +++ b/llap-common/src/protobuf/LlapDaemonProtocol.proto @@ -180,6 +180,12 @@ message GetTokenResponseProto { optional bytes token = 1; } +// The message sent by external client to claim the output from the output socket. +message LlapOutputSocketInitMessage { + required string fragment_id = 1; + optional bytes token = 2; +} + service LlapDaemonProtocol { rpc submitWork(SubmitWorkRequestProto) returns (SubmitWorkResponseProto); rpc sourceStateUpdated(SourceStateUpdatedRequestProto) returns (SourceStateUpdatedResponseProto); http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 03b2a54..4d17080 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -16,10 +16,10 @@ */ package org.apache.hadoop.hive.llap; -import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -37,6 +37,7 @@ import org.apache.commons.collections4.ListUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.VertexOrBinary; @@ -48,7 +49,6 @@ import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -70,13 +70,11 @@ import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; import org.apache.tez.runtime.api.impl.EventType; -import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; import com.google.protobuf.ByteString; @@ -114,6 +112,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> public LlapBaseInputFormat() {} + @SuppressWarnings("unchecked") @Override public RecordReader<NullWritable, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { @@ -148,24 +147,28 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> llapClient.init(job); llapClient.start(); - SubmitWorkRequestProto submitWorkRequestProto = constructSubmitWorkRequestProto( + SubmitWorkRequestProto request = constructSubmitWorkRequestProto( submitWorkInfo, llapSplit.getSplitNum(), llapClient.getAddress(), submitWorkInfo.getToken(), llapSplit.getFragmentBytes(), llapSplit.getFragmentBytesSignature()); - llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort); + llapClient.submitWork(request, host, llapSubmitPort); - String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum(); - - // TODO: security for output channel Socket socket = new Socket(host, serviceInstance.getOutputFormatPort()); LOG.debug("Socket connected"); + SignableVertexSpec vertex = SignableVertexSpec.parseFrom(submitWorkInfo.getVertexBinary()); + String fragmentId = Converters.createTaskAttemptId(vertex.getVertexIdentifier(), + request.getFragmentNumber(), request.getAttemptNumber()).toString(); + OutputStream socketStream = socket.getOutputStream(); + LlapOutputSocketInitMessage.Builder builder = + LlapOutputSocketInitMessage.newBuilder().setFragmentId(fragmentId); + if (llapSplit.getTokenBytes() != null) { + builder.setToken(ByteString.copyFrom(llapSplit.getTokenBytes())); + } + builder.build().writeDelimitedTo(socketStream); + socketStream.flush(); - socket.getOutputStream().write(id.getBytes()); - socket.getOutputStream().write(0); - socket.getOutputStream().flush(); - - LOG.info("Registered id: " + id); + LOG.info("Registered id: " + fragmentId); @SuppressWarnings("rawtypes") LlapBaseRecordReader recordReader = new LlapBaseRecordReader( @@ -291,7 +294,6 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> String user = System.getenv(ApplicationConstants.Environment.USER.name()); LOG.info("Setting user in submitWorkRequest to: " + user); - // TODO: this is bogus. What does LLAP use this for? ContainerId containerId = ContainerId.newInstance(ApplicationAttemptId.newInstance(appId, 0), taskNum); http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index c1ef0f4..c7e9d32 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.security.LlapUgiFactoryFactory; +import org.apache.hadoop.hive.llap.security.SecretManager; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.UDF; @@ -88,6 +89,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla public static final String LLAP_HADOOP_METRICS2_PROPERTIES_FILE = "hadoop-metrics2-llapdaemon.properties"; public static final String HADOOP_METRICS2_PROPERTIES_FILE = "hadoop-metrics2.properties"; private final Configuration shuffleHandlerConf; + private final SecretManager secretManager; private final LlapProtocolServerImpl server; private final ContainerRunnerImpl containerRunner; private final AMReporter amReporter; @@ -245,7 +247,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf); - this.server = new LlapProtocolServerImpl( + SecretManager sm = null; + if (UserGroupInformation.isSecurityEnabled()) { + sm = SecretManager.createSecretManager(daemonConf, daemonId.getClusterString()); + } + this.secretManager = sm; + this.server = new LlapProtocolServerImpl(secretManager, numHandlers, this, srvAddress, mngAddress, srvPort, mngPort, daemonId); UgiFactory fsUgiFactory = null; @@ -348,7 +355,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla this.shufflePort.set(ShuffleHandler.get().getPort()); getConfig() .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort()); - LlapOutputFormatService.initializeAndStart(getConfig()); + LlapOutputFormatService.initializeAndStart(getConfig(), secretManager); super.serviceStart(); // Setup the actual ports in the configuration. http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java index 7ccd28f..eb7a8eb 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapProtocolServerImpl.java @@ -72,18 +72,19 @@ public class LlapProtocolServerImpl extends AbstractService private final int srvPort, mngPort; private RPC.Server server, mngServer; private final AtomicReference<InetSocketAddress> srvAddress, mngAddress; - private SecretManager zkSecretManager; + private final SecretManager secretManager; private String clusterUser = null; private boolean isRestrictedToClusterUser = false; private final DaemonId daemonId; private TokenRequiresSigning isSigningRequiredConfig = TokenRequiresSigning.TRUE; - public LlapProtocolServerImpl(int numHandlers, ContainerRunner containerRunner, - AtomicReference<InetSocketAddress> srvAddress, AtomicReference<InetSocketAddress> mngAddress, - int srvPort, int mngPort, DaemonId daemonId) { + public LlapProtocolServerImpl(SecretManager secretManager, int numHandlers, + ContainerRunner containerRunner, AtomicReference<InetSocketAddress> srvAddress, + AtomicReference<InetSocketAddress> mngAddress, int srvPort, int mngPort, DaemonId daemonId) { super("LlapDaemonProtocolServerImpl"); this.numHandlers = numHandlers; this.containerRunner = containerRunner; + this.secretManager = secretManager; this.srvAddress = srvAddress; this.srvPort = srvPort; this.mngAddress = mngAddress; @@ -156,8 +157,6 @@ public class LlapProtocolServerImpl extends AbstractService } String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL), llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE); - zkSecretManager = SecretManager.createSecretManager( - conf, llapPrincipal, llapKeytab, daemonId.getClusterString()); // Start the protocol server after properly authenticating with daemon keytab. UserGroupInformation daemonUgi = null; @@ -269,8 +268,8 @@ public class LlapProtocolServerImpl extends AbstractService .setBindAddress(addr.getHostName()) .setPort(addr.getPort()) .setNumHandlers(numHandlers); - if (zkSecretManager != null) { - builder = builder.setSecretManager(zkSecretManager); + if (secretManager != null) { + builder = builder.setSecretManager(secretManager); } RPC.Server server = builder.build(); if (isSecurityEnabled) { @@ -283,7 +282,7 @@ public class LlapProtocolServerImpl extends AbstractService @Override public GetTokenResponseProto getDelegationToken(RpcController controller, GetTokenRequestProto request) throws ServiceException { - if (zkSecretManager == null) { + if (secretManager == null) { throw new ServiceException("Operation not supported on unsecure cluster"); } UserGroupInformation callingUser = null; @@ -292,7 +291,7 @@ public class LlapProtocolServerImpl extends AbstractService callingUser = UserGroupInformation.getCurrentUser(); // Determine if the user would need to sign fragments. boolean isSigningRequired = determineIfSigningIsRequired(callingUser); - token = zkSecretManager.createLlapToken( + token = secretManager.createLlapToken( request.hasAppId() ? request.getAppId() : null, null, isSigningRequired); } catch (IOException e) { throw new ServiceException(e); http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java index fd37a06..b38e9d6 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestLlapDaemonProtocolServerImpl.java @@ -44,7 +44,7 @@ public class TestLlapDaemonProtocolServerImpl { int numHandlers = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_RPC_NUM_HANDLERS); ContainerRunner containerRunnerMock = mock(ContainerRunner.class); LlapProtocolServerImpl server = - new LlapProtocolServerImpl(numHandlers, containerRunnerMock, + new LlapProtocolServerImpl(null, numHandlers, containerRunnerMock, new AtomicReference<InetSocketAddress>(), new AtomicReference<InetSocketAddress>(), rpcPort, rpcPort + 1, null); when(containerRunnerMock.submitWork(any(SubmitWorkRequestProto.class))).thenReturn( http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java index 8e98aba..beb0e2b 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormat.java @@ -34,9 +34,6 @@ import org.apache.hadoop.hive.llap.io.api.LlapProxy; import com.google.common.base.Preconditions; -/** - * - */ public class LlapOutputFormat<K extends Writable, V extends Writable> implements OutputFormat<K, V> { http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 06660b3..151a31f 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -19,32 +19,21 @@ package org.apache.hadoop.hive.llap; import java.util.Map; import java.util.HashMap; import java.io.IOException; -import java.io.OutputStream; import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage; +import org.apache.hadoop.hive.llap.security.SecretManager; import com.google.common.base.Preconditions; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,15 +47,14 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.DelimiterBasedFrameDecoder; -import io.netty.handler.codec.Delimiters; -import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.string.StringEncoder; -import io.netty.util.concurrent.Future; /** - * Responsible for sending back result set data to the connections made by external clients via the LLAP input format. + * Responsible for sending back result set data to the connections + * made by external clients via the LLAP input format. */ public class LlapOutputFormatService { @@ -76,24 +64,30 @@ public class LlapOutputFormatService { private static final AtomicBoolean initing = new AtomicBoolean(false); private static LlapOutputFormatService INSTANCE; - private final Map<String, RecordWriter> writers; + // TODO: the global lock might be to coarse here. + private final Object lock = new Object(); + private final Map<String, RecordWriter<?,?>> writers = new HashMap<String, RecordWriter<?,?>>(); + private final Map<String, String> errors = new HashMap<String, String>(); private final Configuration conf; private static final int WAIT_TIME = 5; - private static final int MAX_QUERY_ID_LENGTH = 256; private EventLoopGroup eventLoopGroup; private ServerBootstrap serverBootstrap; private ChannelFuture listeningChannelFuture; private int port; + private final SecretManager sm; + private final long writerTimeoutMs; - private LlapOutputFormatService(Configuration conf) throws IOException { - writers = new HashMap<String, RecordWriter>(); + private LlapOutputFormatService(Configuration conf, SecretManager sm) throws IOException { + this.sm = sm; this.conf = conf; + this.writerTimeoutMs = HiveConf.getTimeVar( + conf, ConfVars.LLAP_DAEMON_OUTPUT_STREAM_TIMEOUT, TimeUnit.MILLISECONDS); } - public static void initializeAndStart(Configuration conf) throws Exception { + public static void initializeAndStart(Configuration conf, SecretManager sm) throws Exception { if (!initing.getAndSet(true)) { - INSTANCE = new LlapOutputFormatService(conf); + INSTANCE = new LlapOutputFormatService(conf, sm); INSTANCE.start(); started.set(true); } @@ -136,50 +130,96 @@ public class LlapOutputFormatService { LOG.warn("LlapOutputFormatService does not appear to have a listening port to close."); } - Future terminationFuture = eventLoopGroup.shutdownGracefully(1, WAIT_TIME, TimeUnit.SECONDS); - terminationFuture.sync(); + eventLoopGroup.shutdownGracefully(1, WAIT_TIME, TimeUnit.SECONDS).sync(); } + @SuppressWarnings("unchecked") public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException { - RecordWriter writer = null; - synchronized(INSTANCE) { + RecordWriter<?, ?> writer = null; + synchronized (lock) { + long startTime = System.nanoTime(); + boolean isFirst = true; while ((writer = writers.get(id)) == null) { - LOG.info("Waiting for writer for: "+id); - INSTANCE.wait(); + String error = errors.remove(id); + if (error != null) { + throw new IOException(error); + } + if (isFirst) { + LOG.info("Waiting for writer for " + id); + isFirst = false; + } + if (((System.nanoTime() - startTime) / 1000000) > writerTimeoutMs) { + throw new IOException("The writer for " + id + " has timed out after " + + writerTimeoutMs + "ms"); + } + lock.wait(writerTimeoutMs); } } LOG.info("Returning writer for: "+id); - return writer; + return (RecordWriter<K, V>) writer; } public int getPort() { return port; } - protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<String> { + protected class LlapOutputFormatServiceHandler + extends SimpleChannelInboundHandler<LlapOutputSocketInitMessage> { private final int sendBufferSize; public LlapOutputFormatServiceHandler(final int sendBufferSize) { this.sendBufferSize = sendBufferSize; } @Override - public void channelRead0(ChannelHandlerContext ctx, String msg) { - String id = msg; - registerReader(ctx, id); + public void channelRead0(ChannelHandlerContext ctx, LlapOutputSocketInitMessage msg) { + String id = msg.getFragmentId(); + byte[] tokenBytes = msg.hasToken() ? msg.getToken().toByteArray() : null; + try { + registerReader(ctx, id, tokenBytes); + } catch (Throwable t) { + // Make sure we fail the channel if something goes wrong. + // We internally handle all the "expected" exceptions, so log a lot of information here. + failChannel(ctx, id, StringUtils.stringifyException(t)); + } } - private void registerReader(ChannelHandlerContext ctx, String id) { - synchronized(INSTANCE) { - LOG.debug("registering socket for: " + id); - OutputStream stream = new ChannelOutputStream(ctx, id, sendBufferSize); - LlapRecordWriter writer = new LlapRecordWriter(stream); - writers.put(id, writer); - - // Add listener to handle any cleanup for when the connection is closed - ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id)); + private void registerReader(ChannelHandlerContext ctx, String id, byte[] tokenBytes) { + if (sm != null) { + try { + sm.verifyToken(tokenBytes); + } catch (SecurityException | IOException ex) { + failChannel(ctx, id, ex.getMessage()); + return; + } + } + LOG.debug("registering socket for: " + id); + @SuppressWarnings("rawtypes") + LlapRecordWriter writer = new LlapRecordWriter( + new ChannelOutputStream(ctx, id, sendBufferSize)); + boolean isFailed = true; + synchronized (lock) { + if (!writers.containsKey(id)) { + isFailed = false; + writers.put(id, writer); + // Add listener to handle any cleanup for when the connection is closed + ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id)); + lock.notifyAll(); + } + } + if (isFailed) { + failChannel(ctx, id, "Writer already registered for " + id); + } + } - INSTANCE.notifyAll(); + /** Do not call under lock. */ + private void failChannel(ChannelHandlerContext ctx, String id, String error) { + // TODO: write error to the channel? there's no mechanism for that now. + ctx.close(); + synchronized (lock) { + errors.put(id, error); + lock.notifyAll(); } + LOG.error(error); } } @@ -192,8 +232,7 @@ public class LlapOutputFormatService { @Override public void operationComplete(ChannelFuture future) throws Exception { - RecordWriter writer = null; - + RecordWriter<?, ?> writer = null; synchronized (INSTANCE) { writer = writers.remove(id); } @@ -213,8 +252,8 @@ public class LlapOutputFormatService { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( - new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()), - new StringDecoder(), + new ProtobufVarint32FrameDecoder(), + new ProtobufDecoder(LlapOutputSocketInitMessage.getDefaultInstance()), new StringEncoder(), new LlapOutputFormatServiceHandler(sendBufferSize)); } http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index f4a9cac..cc2ab39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.AbstractMapOperator; import org.apache.hadoop.hive.llap.io.api.LlapProxy; +import org.apache.hadoop.hive.llap.tez.Converters; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.llap.LlapOutputFormat; import org.apache.hadoop.hive.ql.exec.DummyStoreOperator; @@ -94,11 +95,9 @@ public class MapRecordProcessor extends RecordProcessor { public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); String queryId = HiveConf.getVar(jconf, HiveConf.ConfVars.HIVEQUERYID); - if (LlapProxy.isDaemon()) { // do not cache plan - String id = queryId + "_" + context.getTaskIndex(); - l4j.info("LLAP_OF_ID: "+id); - jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, id); - cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); + if (LlapProxy.isDaemon()) { + cache = new org.apache.hadoop.hive.ql.exec.mr.ObjectCache(); // do not cache plan + setLlapOfFragmentId(context); } else { cache = ObjectCacheFactory.getCache(jconf, queryId); } @@ -108,6 +107,15 @@ public class MapRecordProcessor extends RecordProcessor { nRows = 0; } + private void setLlapOfFragmentId(final ProcessorContext context) { + // TODO: could we do this only if the OF is actually used? + String attemptId = Converters.createTaskAttemptId(context).toString(); + if (l4j.isDebugEnabled()) { + l4j.debug("Setting the LLAP fragment ID for OF to " + attemptId); + } + jconf.set(LlapOutputFormat.LLAP_OF_ID_KEY, attemptId); + } + @Override void init(MRTaskReporter mrReporter, Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java index 9c385d1..37ae668 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.TableScanOperator; @@ -93,6 +94,8 @@ public final class PlanUtils { public static enum ExpressionTypes { FIELD, JEXL }; + public static final String LLAP_OUTPUT_FORMAT_KEY = "Llap"; + private static final String LLAP_OF_SH_CLASS = "org.apache.hadoop.hive.llap.LlapStorageHandler"; public static synchronized long getCountForMapJoinDumpFilePrefix() { return countForMapJoinDumpFilePrefix++; @@ -230,7 +233,7 @@ public final class PlanUtils { return getTableDesc(serdeClass, separatorCode, columns, columnTypes, lastColumnTakesRestOfTheLine, useDelimitedJSON, "TextFile"); - } + } public static TableDesc getTableDesc( Class<? extends Deserializer> serdeClass, String separatorCode, @@ -272,12 +275,10 @@ public final class PlanUtils { inputFormat = RCFileInputFormat.class; outputFormat = RCFileOutputFormat.class; assert serdeClass == ColumnarSerDe.class; - } else if ("Llap".equalsIgnoreCase(fileFormat)) { + } else if (LLAP_OUTPUT_FORMAT_KEY.equalsIgnoreCase(fileFormat)) { inputFormat = TextInputFormat.class; outputFormat = LlapOutputFormat.class; - properties.setProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE, - "org.apache.hadoop.hive.llap.LlapStorageHandler"); + properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, LLAP_OF_SH_CLASS); } else { // use TextFile by default inputFormat = TextInputFormat.class; outputFormat = IgnoreKeyTextOutputFormat.class; http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java index e2f0e84..a2ad4f9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDTFGetSplits.java @@ -41,13 +41,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.LlapInputSplit; -import org.apache.hadoop.hive.llap.LlapOutputFormat; import org.apache.hadoop.hive.llap.NotTezEventHelper; -import org.apache.hadoop.hive.llap.SubmitWorkInfo; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.llap.Schema; -import org.apache.hadoop.hive.llap.FieldDesc; +import org.apache.hadoop.hive.llap.SubmitWorkInfo; import org.apache.hadoop.hive.llap.TypeDesc; import org.apache.hadoop.hive.llap.coordinator.LlapCoordinator; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; @@ -57,6 +55,7 @@ import org.apache.hadoop.hive.llap.security.LlapSigner.SignedMessage; import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier; import org.apache.hadoop.hive.llap.security.LlapTokenLocalClient; import org.apache.hadoop.hive.llap.tez.Converters; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.CommandNeedRetryException; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.Driver; @@ -71,6 +70,7 @@ import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; @@ -81,13 +81,12 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.SplitLocationInfo; @@ -115,7 +114,7 @@ import com.google.common.base.Preconditions; /** * GenericUDTFGetSplits. - * + * */ @Description(name = "get_splits", value = "_FUNC_(string,int) - " + "Returns an array of length int serialized splits for the referenced tables string.") @@ -131,7 +130,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { @Override public StructObjectInspector initialize(ObjectInspector[] arguments) - throws UDFArgumentException { + throws UDFArgumentException { LOG.debug("initializing GenericUDFGetSplits"); @@ -142,14 +141,15 @@ public class GenericUDTFGetSplits extends GenericUDTF { LOG.debug("Initialized conf, jc and metastore connection"); if (arguments.length != 2) { - throw new UDFArgumentLengthException("The function GET_SPLITS accepts 2 arguments."); + throw new UDFArgumentLengthException( + "The function GET_SPLITS accepts 2 arguments."); } else if (!(arguments[0] instanceof StringObjectInspector)) { - LOG.error("Got "+arguments[0].getTypeName()+" instead of string."); + LOG.error("Got " + arguments[0].getTypeName() + " instead of string."); throw new UDFArgumentTypeException(0, "\"" + "string\" is expected at function GET_SPLITS, " + "but \"" + arguments[0].getTypeName() + "\" is found"); } else if (!(arguments[1] instanceof IntObjectInspector)) { - LOG.error("Got "+arguments[1].getTypeName()+" instead of int."); + LOG.error("Got " + arguments[1].getTypeName() + " instead of int."); throw new UDFArgumentTypeException(1, "\"" + "int\" is expected at function GET_SPLITS, " + "but \"" + arguments[1].getTypeName() + "\" is found"); @@ -159,9 +159,10 @@ public class GenericUDTFGetSplits extends GenericUDTF { intOI = (IntObjectInspector) arguments[1]; List<String> names = Arrays.asList("split"); - List<ObjectInspector> fieldOIs = Arrays.<ObjectInspector>asList( - PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); - StructObjectInspector outputOI = ObjectInspectorFactory.getStandardStructObjectInspector(names, fieldOIs); + List<ObjectInspector> fieldOIs = Arrays + .<ObjectInspector> asList(PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector); + StructObjectInspector outputOI = ObjectInspectorFactory + .getStandardStructObjectInspector(names, fieldOIs); LOG.debug("done initializing GenericUDFGetSplits"); return outputOI; @@ -190,7 +191,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { Schema schema = fragment.schema; try { - for (InputSplit s: getSplits(jc, num, tezWork, schema)) { + for (InputSplit s : getSplits(jc, num, tezWork, schema)) { Object[] os = new Object[1]; bos.reset(); s.write(dos); @@ -198,24 +199,26 @@ public class GenericUDTFGetSplits extends GenericUDTF { os[0] = frozen; forward(os); } - } catch(Exception e) { + } catch (Exception e) { throw new HiveException(e); } } - public PlanFragment createPlanFragment(String query, int num) throws HiveException { + public PlanFragment createPlanFragment(String query, int num) + throws HiveException { HiveConf conf = new HiveConf(SessionState.get().getConf()); HiveConf.setVar(conf, ConfVars.HIVEFETCHTASKCONVERSION, "none"); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, "Llap"); + HiveConf.setVar(conf, ConfVars.HIVEQUERYRESULTFILEFORMAT, PlanUtils.LLAP_OUTPUT_FORMAT_KEY); - String originalMode = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE); - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap"); - HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS, true); - HiveConf.setBoolVar(conf, HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS, true); + String originalMode = HiveConf.getVar(conf, + ConfVars.HIVE_EXECUTION_MODE); + HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, "llap"); + HiveConf.setBoolVar(conf, ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS, true); + HiveConf.setBoolVar(conf, ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS, true); conf.setBoolean(TezSplitGrouper.TEZ_GROUPING_NODE_LOCAL_ONLY, true); // Tez/LLAP requires RPC query plan - HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_RPC_QUERY_PLAN, true); + HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true); try { jc = DagUtils.getInstance().createConfiguration(conf); @@ -224,14 +227,9 @@ public class GenericUDTFGetSplits extends GenericUDTF { } Driver driver = new Driver(conf); - CommandProcessorResponse cpr; - - LOG.info("setting fetch.task.conversion to none and query file format to \"" - + LlapOutputFormat.class.getName()+"\""); - - cpr = driver.compileAndRespond(query); - if(cpr.getResponseCode() != 0) { - throw new HiveException("Failed to compile query: "+cpr.getException()); + CommandProcessorResponse cpr = driver.compileAndRespond(query); + if (cpr.getResponseCode() != 0) { + throw new HiveException("Failed to compile query: " + cpr.getException()); } QueryPlan plan = driver.getPlan(); @@ -248,11 +246,11 @@ public class GenericUDTFGetSplits extends GenericUDTF { String tableName = "table_"+UUID.randomUUID().toString().replaceAll("[^A-Za-z0-9 ]", ""); - String ctas = "create temporary table "+tableName+" as "+query; - LOG.info("CTAS: "+ctas); + String ctas = "create temporary table " + tableName + " as " + query; + LOG.info("Materializing the query for LLAPIF; CTAS: " + ctas); try { - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, originalMode); + HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, originalMode); cpr = driver.run(ctas, false); } catch (CommandNeedRetryException e) { throw new HiveException(e); @@ -262,7 +260,7 @@ public class GenericUDTFGetSplits extends GenericUDTF { throw new HiveException("Failed to create temp table: " + cpr.getException()); } - HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE, "llap"); + HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_MODE, "llap"); query = "select * from " + tableName; cpr = driver.compileAndRespond(query); if(cpr.getResponseCode() != 0) { @@ -310,9 +308,9 @@ public class GenericUDTFGetSplits extends GenericUDTF { // we have the dag now proceed to get the splits: Preconditions.checkState(HiveConf.getBoolVar(wxConf, - HiveConf.ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS)); + ConfVars.HIVE_TEZ_GENERATE_CONSISTENT_SPLITS)); Preconditions.checkState(HiveConf.getBoolVar(wxConf, - HiveConf.ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); + ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS)); HiveSplitGenerator splitGenerator = new HiveSplitGenerator(wxConf, mapWork); List<Event> eventList = splitGenerator.initialize(); @@ -457,9 +455,9 @@ public class GenericUDTFGetSplits extends GenericUDTF { } /** - * Returns a local resource representing a jar. This resource will be used to execute the plan on - * the cluster. - * + * Returns a local resource representing a jar. This resource will be used to + * execute the plan on the cluster. + * * @param localJarPath * Local path to the jar to be localized. * @return LocalResource corresponding to the localized hive exec resource. @@ -470,9 +468,9 @@ public class GenericUDTFGetSplits extends GenericUDTF { * @throws URISyntaxException * when current jar location cannot be determined. */ - private LocalResource createJarLocalResource(String localJarPath, DagUtils utils, - Configuration conf) - throws IOException, LoginException, IllegalArgumentException, FileNotFoundException { + private LocalResource createJarLocalResource(String localJarPath, + DagUtils utils, Configuration conf) throws IOException, LoginException, + IllegalArgumentException, FileNotFoundException { FileStatus destDirStatus = utils.getHiveJarDirectory(conf); assert destDirStatus != null; Path destDirPath = destDirStatus.getPath(); @@ -482,19 +480,24 @@ public class GenericUDTFGetSplits extends GenericUDTF { String destFileName = localFile.getName(); - // Now, try to find the file based on SHA and name. Currently we require exact name match. - // We could also allow cutting off versions and other stuff provided that SHA matches... + // Now, try to find the file based on SHA and name. Currently we require + // exact name match. + // We could also allow cutting off versions and other stuff provided that + // SHA matches... destFileName = FilenameUtils.removeExtension(destFileName) + "-" + sha - + FilenameUtils.EXTENSION_SEPARATOR + FilenameUtils.getExtension(destFileName); + + FilenameUtils.EXTENSION_SEPARATOR + + FilenameUtils.getExtension(destFileName); - // TODO: if this method is ever called on more than one jar, getting the dir and the + // TODO: if this method is ever called on more than one jar, getting the dir + // and the // list need to be refactored out to be done only once. Path destFile = new Path(destDirPath.toString() + "/" + destFileName); - return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, conf); + return utils.localizeResource(localFile, destFile, LocalResourceType.FILE, + conf); } - private String getSha(Path localFile, Configuration conf) - throws IOException, IllegalArgumentException { + private String getSha(Path localFile, Configuration conf) throws IOException, + IllegalArgumentException { InputStream is = null; try { FileSystem localFs = FileSystem.getLocal(conf); @@ -510,57 +513,60 @@ public class GenericUDTFGetSplits extends GenericUDTF { private TypeDesc convertTypeString(String typeString) throws HiveException { TypeDesc typeDesc; TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString); - Preconditions.checkState(typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE, + Preconditions.checkState( + typeInfo.getCategory() == ObjectInspector.Category.PRIMITIVE, "Unsupported non-primitive type " + typeString); switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { - case BOOLEAN: - typeDesc = new TypeDesc(TypeDesc.Type.BOOLEAN); - break; - case BYTE: - typeDesc = new TypeDesc(TypeDesc.Type.TINYINT); - break; - case SHORT: - typeDesc = new TypeDesc(TypeDesc.Type.SMALLINT); - break; - case INT: - typeDesc = new TypeDesc(TypeDesc.Type.INT); - break; - case LONG: - typeDesc = new TypeDesc(TypeDesc.Type.BIGINT); - break; - case FLOAT: - typeDesc = new TypeDesc(TypeDesc.Type.FLOAT); - break; - case DOUBLE: - typeDesc = new TypeDesc(TypeDesc.Type.DOUBLE); - break; - case STRING: - typeDesc = new TypeDesc(TypeDesc.Type.STRING); - break; - case CHAR: - CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo; - typeDesc = new TypeDesc(TypeDesc.Type.CHAR, charTypeInfo.getLength()); - break; - case VARCHAR: - VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo; - typeDesc = new TypeDesc(TypeDesc.Type.VARCHAR, varcharTypeInfo.getLength()); - break; - case DATE: - typeDesc = new TypeDesc(TypeDesc.Type.DATE); - break; - case TIMESTAMP: - typeDesc = new TypeDesc(TypeDesc.Type.TIMESTAMP); - break; - case BINARY: - typeDesc = new TypeDesc(TypeDesc.Type.BINARY); - break; - case DECIMAL: - DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; - typeDesc = new TypeDesc(TypeDesc.Type.DECIMAL, decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); - break; - default: - throw new HiveException("Unsupported type " + typeString); + case BOOLEAN: + typeDesc = new TypeDesc(TypeDesc.Type.BOOLEAN); + break; + case BYTE: + typeDesc = new TypeDesc(TypeDesc.Type.TINYINT); + break; + case SHORT: + typeDesc = new TypeDesc(TypeDesc.Type.SMALLINT); + break; + case INT: + typeDesc = new TypeDesc(TypeDesc.Type.INT); + break; + case LONG: + typeDesc = new TypeDesc(TypeDesc.Type.BIGINT); + break; + case FLOAT: + typeDesc = new TypeDesc(TypeDesc.Type.FLOAT); + break; + case DOUBLE: + typeDesc = new TypeDesc(TypeDesc.Type.DOUBLE); + break; + case STRING: + typeDesc = new TypeDesc(TypeDesc.Type.STRING); + break; + case CHAR: + CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo; + typeDesc = new TypeDesc(TypeDesc.Type.CHAR, charTypeInfo.getLength()); + break; + case VARCHAR: + VarcharTypeInfo varcharTypeInfo = (VarcharTypeInfo) typeInfo; + typeDesc = new TypeDesc(TypeDesc.Type.VARCHAR, + varcharTypeInfo.getLength()); + break; + case DATE: + typeDesc = new TypeDesc(TypeDesc.Type.DATE); + break; + case TIMESTAMP: + typeDesc = new TypeDesc(TypeDesc.Type.TIMESTAMP); + break; + case BINARY: + typeDesc = new TypeDesc(TypeDesc.Type.BINARY); + break; + case DECIMAL: + DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo; + typeDesc = new TypeDesc(TypeDesc.Type.DECIMAL, + decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale()); + break; + default: + throw new HiveException("Unsupported type " + typeString); } return typeDesc; http://git-wip-us.apache.org/repos/asf/hive/blob/0d69a88b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java index 1d592fb..577037c 100644 --- a/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/llap/TestLlapOutputFormat.java @@ -27,35 +27,21 @@ import java.net.Socket; import java.io.OutputStream; import java.io.InputStream; -import java.io.File; import java.io.IOException; -import java.io.FileInputStream; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; - -import junit.framework.TestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.io.RCFile.Reader; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.LlapBaseRecordReader.ReaderEvent; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage; import org.apache.hadoop.hive.llap.io.api.LlapProxy; - public class TestLlapOutputFormat { private static final Logger LOG = LoggerFactory.getLogger(TestLlapOutputFormat.class); @@ -68,7 +54,7 @@ public class TestLlapOutputFormat { Configuration conf = new Configuration(); // Pick random avail port HiveConf.setIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, 0); - LlapOutputFormatService.initializeAndStart(conf); + LlapOutputFormatService.initializeAndStart(conf, null); service = LlapOutputFormatService.get(); LlapProxy.setDaemon(true); LOG.debug("Output service up"); @@ -86,7 +72,7 @@ public class TestLlapOutputFormat { JobConf job = new JobConf(); for (int k = 0; k < 5; ++k) { - String id = "foobar"+k; + String id = "foobar" + k; job.set(LlapOutputFormat.LLAP_OF_ID_KEY, id); LlapOutputFormat format = new LlapOutputFormat(); @@ -95,9 +81,10 @@ public class TestLlapOutputFormat { LOG.debug("Socket connected"); - socket.getOutputStream().write(id.getBytes()); - socket.getOutputStream().write(0); - socket.getOutputStream().flush(); + OutputStream socketStream = socket.getOutputStream(); + LlapOutputSocketInitMessage.newBuilder() + .setFragmentId(id).build().writeDelimitedTo(socketStream); + socketStream.flush(); Thread.sleep(3000); @@ -131,7 +118,38 @@ public class TestLlapOutputFormat { reader.close(); - Assert.assertEquals(count,10); + Assert.assertEquals(10, count); + } + } + + + @Test + public void testBadClientMessage() throws Exception { + JobConf job = new JobConf(); + String id = "foobar"; + job.set(LlapOutputFormat.LLAP_OF_ID_KEY, id); + LlapOutputFormat format = new LlapOutputFormat(); + + Socket socket = new Socket("localhost", service.getPort()); + + LOG.debug("Socket connected"); + + OutputStream socketStream = socket.getOutputStream(); + LlapOutputSocketInitMessage.newBuilder() + .setFragmentId(id).build().writeDelimitedTo(socketStream); + LlapOutputSocketInitMessage.newBuilder() + .setFragmentId(id).build().writeDelimitedTo(socketStream); + socketStream.flush(); + + Thread.sleep(3000); + + LOG.debug("Data written"); + + try { + format.getRecordWriter(null, job, null, null); + Assert.fail("Didn't throw"); + } catch (IOException ex) { + // Expected. } } }