HIVE-19307: Support ArrowOutputStream in LlapOutputFormatService (Eric Wohlstadter, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f7f90a04 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f7f90a04 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f7f90a04 Branch: refs/heads/branch-3 Commit: f7f90a044499739a2bd6a3ea543f70cb59e3f870 Parents: 2726f30 Author: Jason Dere <jd...@hortonworks.com> Authored: Tue May 15 14:25:40 2018 -0700 Committer: Vineet Garg <vg...@apache.org> Committed: Tue May 29 13:58:16 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../hadoop/hive/llap/LlapArrowRecordWriter.java | 70 +++++++++++ .../hive/llap/LlapOutputFormatService.java | 11 +- .../hive/llap/WritableByteChannelAdapter.java | 125 +++++++++++++++++++ .../hadoop/hive/ql/exec/FileSinkOperator.java | 26 ++-- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 28 +++-- .../hadoop/hive/ql/plan/FileSinkDesc.java | 12 +- 7 files changed, 251 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 128e892..8780374 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -397,6 +397,7 @@ public class HiveConf extends Configuration { llapDaemonVarsSetLocal.add(ConfVars.LLAP_VALIDATE_ACLS.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_LOGGER.varname); llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_AM_USE_FQDN.varname); + llapDaemonVarsSetLocal.add(ConfVars.LLAP_OUTPUT_FORMAT_ARROW.varname); } /** @@ -4160,6 +4161,8 @@ public class HiveConf extends Configuration { Constants.LLAP_LOGGER_NAME_RFA, Constants.LLAP_LOGGER_NAME_CONSOLE), "logger used for llap-daemons."), + LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false, + "Whether LLapOutputFormatService should output arrow batches"), HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", "500ms", new TimeValidator(TimeUnit.MILLISECONDS), http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java new file mode 100644 index 0000000..1b3a3eb --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import java.io.IOException; + +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamWriter; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.io.Writable; +import java.nio.channels.WritableByteChannel; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Writes Arrow batches to an {@link org.apache.arrow.vector.ipc.ArrowStreamWriter}. + * The byte stream will be formatted according to the Arrow Streaming format. + * Because ArrowStreamWriter is bound to a {@link org.apache.arrow.vector.VectorSchemaRoot} + * when it is created, + * calls to the {@link #write(Writable, Writable)} method only serve as a signal that + * a new batch has been loaded to the associated VectorSchemaRoot. + * Payload data for writing is indirectly made available by reference: + * ArrowStreamWriter -> VectorSchemaRoot -> List<FieldVector> + * i.e. both they key and value are ignored once a reference to the VectorSchemaRoot + * is obtained. + */ +public class LlapArrowRecordWriter<K extends Writable, V extends Writable> + implements RecordWriter<K, V> { + public static final Logger LOG = LoggerFactory.getLogger(LlapArrowRecordWriter.class); + + ArrowStreamWriter arrowStreamWriter; + WritableByteChannel out; + + public LlapArrowRecordWriter(WritableByteChannel out) { + this.out = out; + } + + @Override + public void close(Reporter reporter) throws IOException { + arrowStreamWriter.close(); + } + + @Override + public void write(K key, V value) throws IOException { + ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) value; + if (arrowStreamWriter == null) { + VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot(); + arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out); + } + arrowStreamWriter.writeBatch(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java index 30d5eb5..c71c637 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java @@ -198,11 +198,16 @@ public class LlapOutputFormatService { LOG.debug("registering socket for: " + id); int maxPendingWrites = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES); + boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); @SuppressWarnings("rawtypes") - LlapRecordWriter writer = new LlapRecordWriter(id, + RecordWriter writer = null; + if(useArrow) { + writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, maxPendingWrites, id)); + } else { + writer = new LlapRecordWriter(id, new ChunkedOutputStream( - new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), - sendBufferSize, id)); + new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites), sendBufferSize, id)); + } boolean isFailed = true; synchronized (lock) { if (!writers.containsKey(id)) { http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java new file mode 100644 index 0000000..57da1d9 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap; + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.Semaphore; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; + +/** + * Provides an adapter between {@link java.nio.channels.WritableByteChannel} + * and {@link io.netty.channel.ChannelHandlerContext}. + * Additionally provides a form of flow-control by limiting the number of + * queued async writes. + */ +public class WritableByteChannelAdapter implements WritableByteChannel { + + private static final Logger LOG = LoggerFactory.getLogger(WritableByteChannelAdapter.class); + private ChannelHandlerContext chc; + private final int maxPendingWrites; + // This semaphore provides two functions: + // 1. Forces a cap on the number of outstanding async writes to channel + // 2. Ensures that channel isn't closed if there are any outstanding async writes + private final Semaphore writeResources; + private boolean closed = false; + private final String id; + + private ChannelFutureListener writeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + //Asynch write completed + //Up the semaphore + writeResources.release(); + + if (future.isCancelled()) { + LOG.error("Write cancelled on ID " + id); + } else if (!future.isSuccess()) { + LOG.error("Write error on ID " + id, future.cause()); + } + } + }; + + private ChannelFutureListener closeListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isCancelled()) { + LOG.error("Close cancelled on ID " + id); + } else if (!future.isSuccess()) { + LOG.error("Close failed on ID " + id, future.cause()); + } + } + }; + + public WritableByteChannelAdapter(ChannelHandlerContext chc, int maxPendingWrites, String id) { + this.chc = chc; + this.maxPendingWrites = maxPendingWrites; + this.writeResources = new Semaphore(maxPendingWrites); + this.id = id; + } + + @Override + public int write(ByteBuffer src) throws IOException { + int size = src.remaining(); + //Down the semaphore or block until available + takeWriteResources(1); + chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener); + return size; + } + + @Override + public boolean isOpen() { + return chc.channel().isOpen(); + } + + @Override + public void close() throws IOException { + if (closed) { + throw new IOException("Already closed: " + id); + } + + closed = true; + //Block until all semaphore resources are released + //by outstanding async writes + takeWriteResources(maxPendingWrites); + + try { + chc.close().addListener(closeListener); + } finally { + chc = null; + closed = true; + } + } + + private void takeWriteResources(int numResources) throws IOException { + try { + writeResources.acquire(numResources); + } catch (InterruptedException ie) { + throw new IOException("Interrupted while waiting for write resources for " + id); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 01a5b4c..9c57eff 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.io.HivePartitioner; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter; import org.apache.hadoop.hive.ql.io.StreamingOutputFormat; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveFatalException; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; @@ -1251,16 +1252,25 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a certain limit (hive.server2.thrift.resultset.max.fetch.size) // and serializes the whole batch when the buffer is full. The serialize returns null if the buffer is not full // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe). - if (conf.isUsingThriftJDBCBinarySerDe()) { - try { - recordValue = serializer.serialize(null, inputObjInspectors[0]); - if ( null != fpaths ) { - rowOutWriters = fpaths.outWriters; - rowOutWriters[0].write(recordValue); + if (conf.isUsingBatchingSerDe()) { + try { + recordValue = serializer.serialize(null, inputObjInspectors[0]); + if (null != fpaths) { + rowOutWriters = fpaths.outWriters; + rowOutWriters[0].write(recordValue); + } else if(recordValue instanceof ArrowWrapperWritable) { + //Because LLAP arrow output depends on the ThriftJDBCBinarySerDe code path + //this is required for 0 row outputs + //i.e. we need to write a 0 size batch to signal EOS to the consumer + for (FSPaths fsPaths : valToPaths.values()) { + for(RecordWriter writer : fsPaths.outWriters) { + writer.write(recordValue); + } } - } catch (SerDeException | IOException e) { - throw new HiveException(e); } + } catch (SerDeException | IOException e) { + throw new HiveException(e); + } } List<Path> commitPaths = new ArrayList<>(); for (FSPaths fsp : valToPaths.values()) { http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index bc62f3c..863ab1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.AcidUtils.Operation; +import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; @@ -7498,7 +7499,12 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { fileFormat = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT); Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class; if (fileFormat.equals(PlanUtils.LLAP_OUTPUT_FORMAT_KEY)) { - serdeClass = LazyBinarySerDe2.class; + boolean useArrow = HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW); + if(useArrow) { + serdeClass = ArrowColumnarBatchSerDe.class; + } else { + serdeClass = LazyBinarySerDe2.class; + } } table_desc = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, @@ -7579,13 +7585,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ltd.setInsertOverwrite(true); } } - if (SessionState.get().isHiveServerQuery() && - null != table_desc && - table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) && - HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) { - fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true); + if (null != table_desc && useBatchingSerializer(table_desc.getSerdeClassName())) { + fileSinkDesc.setIsUsingBatchingSerDe(true); } else { - fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false); + fileSinkDesc.setIsUsingBatchingSerDe(false); } Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild( @@ -7620,6 +7623,17 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { return output; } + private boolean useBatchingSerializer(String serdeClassName) { + return SessionState.get().isHiveServerQuery() && + hasSetBatchSerializer(serdeClassName); + } + + private boolean hasSetBatchSerializer(String serdeClassName) { + return (serdeClassName.equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) && + HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) || + serdeClassName.equalsIgnoreCase(ArrowColumnarBatchSerDe.class.getName()); + } + private ColsAndTypes deriveFileSinkColTypes( RowResolver inputRR, List<FieldSchema> field_schemas) throws SemanticException { ColsAndTypes result = new ColsAndTypes("", ""); http://git-wip-us.apache.org/repos/asf/hive/blob/f7f90a04/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index fcb6de7..1d05468 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -103,9 +103,9 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe /** * Whether is a HiveServer query, and the destination table is - * indeed written using ThriftJDBCBinarySerDe + * indeed written using a row batching SerDe */ - private boolean isUsingThriftJDBCBinarySerDe = false; + private boolean isUsingBatchingSerDe = false; private boolean isInsertOverwrite = false; @@ -183,12 +183,12 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe this.isHiveServerQuery = isHiveServerQuery; } - public boolean isUsingThriftJDBCBinarySerDe() { - return this.isUsingThriftJDBCBinarySerDe; + public boolean isUsingBatchingSerDe() { + return this.isUsingBatchingSerDe; } - public void setIsUsingThriftJDBCBinarySerDe(boolean isUsingThriftJDBCBinarySerDe) { - this.isUsingThriftJDBCBinarySerDe = isUsingThriftJDBCBinarySerDe; + public void setIsUsingBatchingSerDe(boolean isUsingBatchingSerDe) { + this.isUsingBatchingSerDe = isUsingBatchingSerDe; } @Explain(displayName = "directory", explainLevels = { Level.EXTENDED })