Repository: hive
Updated Branches:
  refs/heads/master 4b418a4ae -> d04db94f6


HIVE-19307: Support ArrowOutputStream in LlapOutputFormatService (Eric 
Wohlstadter, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d04db94f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d04db94f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d04db94f

Branch: refs/heads/master
Commit: d04db94f6c21050edf5d782efcc23fe48192d624
Parents: 4b418a4
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue May 15 14:25:40 2018 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue May 15 14:27:30 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   3 +
 .../hadoop/hive/llap/LlapArrowRecordWriter.java |  70 +++++++++++
 .../hive/llap/LlapOutputFormatService.java      |  11 +-
 .../hive/llap/WritableByteChannelAdapter.java   | 125 +++++++++++++++++++
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  26 ++--
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  28 +++--
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |  12 +-
 7 files changed, 251 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d04db94f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 0a997a1..b12a7a4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -397,6 +397,7 @@ public class HiveConf extends Configuration {
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_VALIDATE_ACLS.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_LOGGER.varname);
     llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_AM_USE_FQDN.varname);
+    llapDaemonVarsSetLocal.add(ConfVars.LLAP_OUTPUT_FORMAT_ARROW.varname);
   }
 
   /**
@@ -4165,6 +4166,8 @@ public class HiveConf extends Configuration {
             Constants.LLAP_LOGGER_NAME_RFA,
             Constants.LLAP_LOGGER_NAME_CONSOLE),
         "logger used for llap-daemons."),
+    LLAP_OUTPUT_FORMAT_ARROW("hive.llap.output.format.arrow", false,
+      "Whether LLapOutputFormatService should output arrow batches"),
 
     HIVE_TRIGGER_VALIDATION_INTERVAL("hive.trigger.validation.interval", 
"500ms",
       new TimeValidator(TimeUnit.MILLISECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/d04db94f/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
new file mode 100644
index 0000000..1b3a3eb
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.io.Writable;
+import java.nio.channels.WritableByteChannel;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writes Arrow batches to an {@link 
org.apache.arrow.vector.ipc.ArrowStreamWriter}.
+ * The byte stream will be formatted according to the Arrow Streaming format.
+ * Because ArrowStreamWriter is bound to a {@link 
org.apache.arrow.vector.VectorSchemaRoot}
+ * when it is created,
+ * calls to the {@link #write(Writable, Writable)} method only serve as a 
signal that
+ * a new batch has been loaded to the associated VectorSchemaRoot.
+ * Payload data for writing is indirectly made available by reference:
+ * ArrowStreamWriter -> VectorSchemaRoot -> List<FieldVector>
+ * i.e. both they key and value are ignored once a reference to the 
VectorSchemaRoot
+ * is obtained.
+ */
+public class LlapArrowRecordWriter<K extends Writable, V extends Writable>
+    implements RecordWriter<K, V> {
+  public static final Logger LOG = 
LoggerFactory.getLogger(LlapArrowRecordWriter.class);
+
+  ArrowStreamWriter arrowStreamWriter;
+  WritableByteChannel out;
+
+  public LlapArrowRecordWriter(WritableByteChannel out) {
+    this.out = out;
+  }
+
+  @Override
+  public void close(Reporter reporter) throws IOException {
+    arrowStreamWriter.close();
+  }
+
+  @Override
+  public void write(K key, V value) throws IOException {
+    ArrowWrapperWritable arrowWrapperWritable = (ArrowWrapperWritable) value;
+    if (arrowStreamWriter == null) {
+      VectorSchemaRoot vectorSchemaRoot = 
arrowWrapperWritable.getVectorSchemaRoot();
+      arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out);
+    }
+    arrowStreamWriter.writeBatch();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d04db94f/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 30d5eb5..c71c637 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -198,11 +198,16 @@ public class LlapOutputFormatService {
       LOG.debug("registering socket for: " + id);
       int maxPendingWrites = HiveConf.getIntVar(conf,
           HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
+      boolean useArrow = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW);
       @SuppressWarnings("rawtypes")
-      LlapRecordWriter writer = new LlapRecordWriter(id,
+      RecordWriter writer = null;
+      if(useArrow) {
+        writer = new LlapArrowRecordWriter(new WritableByteChannelAdapter(ctx, 
maxPendingWrites, id));
+      } else {
+        writer = new LlapRecordWriter(id,
           new ChunkedOutputStream(
-              new ChannelOutputStream(ctx, id, sendBufferSize, 
maxPendingWrites),
-              sendBufferSize, id));
+            new ChannelOutputStream(ctx, id, sendBufferSize, 
maxPendingWrites), sendBufferSize, id));
+      }
       boolean isFailed = true;
       synchronized (lock) {
         if (!writers.containsKey(id)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d04db94f/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java 
b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
new file mode 100644
index 0000000..57da1d9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.Semaphore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+
+/**
+ * Provides an adapter between {@link java.nio.channels.WritableByteChannel}
+ * and {@link io.netty.channel.ChannelHandlerContext}.
+ * Additionally provides a form of flow-control by limiting the number of
+ * queued async writes.
+ */
+public class WritableByteChannelAdapter implements WritableByteChannel {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(WritableByteChannelAdapter.class);
+  private ChannelHandlerContext chc;
+  private final int maxPendingWrites;
+  // This semaphore provides two functions:
+  // 1. Forces a cap on the number of outstanding async writes to channel
+  // 2. Ensures that channel isn't closed if there are any outstanding async 
writes
+  private final Semaphore writeResources;
+  private boolean closed = false;
+  private final String id;
+
+  private ChannelFutureListener writeListener = new ChannelFutureListener() {
+    @Override
+    public void operationComplete(ChannelFuture future) {
+      //Asynch write completed
+      //Up the semaphore
+      writeResources.release();
+
+      if (future.isCancelled()) {
+        LOG.error("Write cancelled on ID " + id);
+      } else if (!future.isSuccess()) {
+        LOG.error("Write error on ID " + id, future.cause());
+      }
+    }
+  };
+
+  private ChannelFutureListener closeListener = new ChannelFutureListener() {
+    @Override
+    public void operationComplete(ChannelFuture future) {
+      if (future.isCancelled()) {
+        LOG.error("Close cancelled on ID " + id);
+      } else if (!future.isSuccess()) {
+        LOG.error("Close failed on ID " + id, future.cause());
+      }
+    }
+  };
+
+  public WritableByteChannelAdapter(ChannelHandlerContext chc, int 
maxPendingWrites, String id) {
+    this.chc = chc;
+    this.maxPendingWrites = maxPendingWrites;
+    this.writeResources = new Semaphore(maxPendingWrites);
+    this.id = id;
+  }
+
+  @Override
+  public int write(ByteBuffer src) throws IOException {
+    int size = src.remaining();
+    //Down the semaphore or block until available
+    takeWriteResources(1);
+    chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener);
+    return size;
+  }
+
+  @Override
+  public boolean isOpen() {
+    return chc.channel().isOpen();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      throw new IOException("Already closed: " + id);
+    }
+
+    closed = true;
+    //Block until all semaphore resources are released
+    //by outstanding async writes
+    takeWriteResources(maxPendingWrites);
+
+    try {
+      chc.close().addListener(closeListener);
+    } finally {
+      chc = null;
+      closed = true;
+    }
+  }
+
+  private void takeWriteResources(int numResources) throws IOException {
+    try {
+      writeResources.acquire(numResources);
+    } catch (InterruptedException ie) {
+      throw new IOException("Interrupted while waiting for write resources for 
" + id);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/d04db94f/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 01a5b4c..9c57eff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.io.HivePartitioner;
 import org.apache.hadoop.hive.ql.io.RecordUpdater;
 import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.StreamingOutputFormat;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -1251,16 +1252,25 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       // If serializer is ThriftJDBCBinarySerDe, then it buffers rows to a 
certain limit (hive.server2.thrift.resultset.max.fetch.size)
       // and serializes the whole batch when the buffer is full. The serialize 
returns null if the buffer is not full
       // (the size of buffer is kept track of in the ThriftJDBCBinarySerDe).
-      if (conf.isUsingThriftJDBCBinarySerDe()) {
-          try {
-            recordValue = serializer.serialize(null, inputObjInspectors[0]);
-            if ( null != fpaths ) {
-              rowOutWriters = fpaths.outWriters;
-              rowOutWriters[0].write(recordValue);
+      if (conf.isUsingBatchingSerDe()) {
+        try {
+          recordValue = serializer.serialize(null, inputObjInspectors[0]);
+          if (null != fpaths) {
+            rowOutWriters = fpaths.outWriters;
+            rowOutWriters[0].write(recordValue);
+          } else if(recordValue instanceof ArrowWrapperWritable) {
+            //Because LLAP arrow output depends on the ThriftJDBCBinarySerDe 
code path
+            //this is required for 0 row outputs
+            //i.e. we need to write a 0 size batch to signal EOS to the 
consumer
+            for (FSPaths fsPaths : valToPaths.values()) {
+              for(RecordWriter writer : fsPaths.outWriters) {
+                writer.write(recordValue);
+              }
             }
-          } catch (SerDeException | IOException e) {
-            throw new HiveException(e);
           }
+        } catch (SerDeException | IOException e) {
+          throw new HiveException(e);
+        }
       }
       List<Path> commitPaths = new ArrayList<>();
       for (FSPaths fsp : valToPaths.values()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d04db94f/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index ff952b6..7ff7e18 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -109,6 +109,7 @@ import org.apache.hadoop.hive.ql.io.AcidInputFormat;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils.Operation;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe;
 import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -7492,7 +7493,12 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
             fileFormat = HiveConf.getVar(conf, 
HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT);
             Class<? extends Deserializer> serdeClass = LazySimpleSerDe.class;
             if (fileFormat.equals(PlanUtils.LLAP_OUTPUT_FORMAT_KEY)) {
-              serdeClass = LazyBinarySerDe2.class;
+              boolean useArrow = HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.LLAP_OUTPUT_FORMAT_ARROW);
+              if(useArrow) {
+                serdeClass = ArrowColumnarBatchSerDe.class;
+              } else {
+                serdeClass = LazyBinarySerDe2.class;
+              }
             }
             table_desc =
                 PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, 
fileFormat,
@@ -7573,13 +7579,10 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         ltd.setInsertOverwrite(true);
       }
     }
-    if (SessionState.get().isHiveServerQuery() &&
-        null != table_desc &&
-        
table_desc.getSerdeClassName().equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName())
 &&
-        
HiveConf.getBoolVar(conf,HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS))
 {
-      fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(true);
+    if (null != table_desc && 
useBatchingSerializer(table_desc.getSerdeClassName())) {
+      fileSinkDesc.setIsUsingBatchingSerDe(true);
     } else {
-      fileSinkDesc.setIsUsingThriftJDBCBinarySerDe(false);
+      fileSinkDesc.setIsUsingBatchingSerDe(false);
     }
 
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
@@ -7614,6 +7617,17 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     return output;
   }
 
+  private boolean useBatchingSerializer(String serdeClassName) {
+    return SessionState.get().isHiveServerQuery() &&
+      hasSetBatchSerializer(serdeClassName);
+  }
+
+  private boolean hasSetBatchSerializer(String serdeClassName) {
+    return 
(serdeClassName.equalsIgnoreCase(ThriftJDBCBinarySerDe.class.getName()) &&
+      HiveConf.getBoolVar(conf, 
HiveConf.ConfVars.HIVE_SERVER2_THRIFT_RESULTSET_SERIALIZE_IN_TASKS)) ||
+    serdeClassName.equalsIgnoreCase(ArrowColumnarBatchSerDe.class.getName());
+  }
+
   private ColsAndTypes deriveFileSinkColTypes(
       RowResolver inputRR, List<FieldSchema> field_schemas) throws 
SemanticException {
     ColsAndTypes result = new ColsAndTypes("", "");

http://git-wip-us.apache.org/repos/asf/hive/blob/d04db94f/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index fcb6de7..1d05468 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -103,9 +103,9 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
 
   /**
    * Whether is a HiveServer query, and the destination table is
-   * indeed written using ThriftJDBCBinarySerDe
+   * indeed written using a row batching SerDe
    */
-  private boolean isUsingThriftJDBCBinarySerDe = false;
+  private boolean isUsingBatchingSerDe = false;
 
   private boolean isInsertOverwrite = false;
 
@@ -183,12 +183,12 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
          this.isHiveServerQuery = isHiveServerQuery;
   }
 
-  public boolean isUsingThriftJDBCBinarySerDe() {
-         return this.isUsingThriftJDBCBinarySerDe;
+  public boolean isUsingBatchingSerDe() {
+    return this.isUsingBatchingSerDe;
   }
 
-  public void setIsUsingThriftJDBCBinarySerDe(boolean 
isUsingThriftJDBCBinarySerDe) {
-         this.isUsingThriftJDBCBinarySerDe = isUsingThriftJDBCBinarySerDe;
+  public void setIsUsingBatchingSerDe(boolean isUsingBatchingSerDe) {
+    this.isUsingBatchingSerDe = isUsingBatchingSerDe;
   }
 
   @Explain(displayName = "directory", explainLevels = { Level.EXTENDED })

Reply via email to