This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new f6aa916a171 HIVE-27765: Backport of HIVE-20052, HIVE-20093, 
HIVE-20203, HIVE-20290, HIVE-20300, HIVE-20312, HIVE-20044, HIVE-21966 to 
branch-3(#4772)
f6aa916a171 is described below

commit f6aa916a17109d4f64e7dd878b49912b79dd8d75
Author: Aman Raj <104416558+amanraj2...@users.noreply.github.com>
AuthorDate: Tue Oct 10 10:41:32 2023 +0530

    HIVE-27765: Backport of HIVE-20052, HIVE-20093, HIVE-20203, HIVE-20290, 
HIVE-20300, HIVE-20312, HIVE-20044, HIVE-21966 to branch-3(#4772)
    
    * HIVE-20052: Arrow serde should fill ArrowColumnVector(Decimal) with the 
given schema precision/scale
    * HIVE-20093: LlapOutputFomatService: Use ArrowBuf with Netty for Accounting
    * HIVE-20203: Arrow SerDe leaks a DirectByteBuffer
    * HIVE-20290: Lazy initialize ArrowColumnarBatchSerDe so it doesn't 
allocate buffers during GetSplits
    * HIVE-20300: VectorFileSinkArrowOperator
    * HIVE-20312: Allow arrow clients to use their own BufferAllocator with 
LlapOutputFormatService
    * HIVE-20044: Arrow Serde should pad char values and handle empty strings 
correctly
    * HIVE-21966: Llap external client - Arrow Serializer throws 
ArrayIndexOutOfBoundsException in some cases
    ---------
    
    Co-authored-by: Eric Wohlstadter <ewohlstad...@hortonworks.com>
    Co-authored-by: Nikhil Gupta <gup...@microsoft.com>
    Co-authored-by: Teddy Choi <pudi...@gmail.com>
    Co-authored-by: Shubham Chaurasia <schaura...@cloudera.com>
    
    Signed-off-by: Sankar Hariappan <sank...@apache.org>
    Closes (#4772)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   8 +-
 .../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java |  45 +-
 .../hive/jdbc/TestJdbcWithMiniLlapArrow.java       |   7 +-
 .../apache/hive/jdbc/TestJdbcWithMiniLlapRow.java  |   6 +-
 ...w.java => TestJdbcWithMiniLlapVectorArrow.java} | 297 ++++---
 .../hive/llap/LlapArrowBatchRecordReader.java      |  15 +-
 .../hadoop/hive/llap/LlapArrowRowInputFormat.java  |  14 +-
 .../hadoop/hive/llap/LlapBaseInputFormat.java      |  25 +-
 .../hadoop/hive/llap/LlapArrowRecordWriter.java    |  25 +-
 .../hive/llap/WritableByteChannelAdapter.java      |  12 +-
 .../filesink/VectorFileSinkArrowOperator.java      | 180 +++++
 .../hive/ql/io/arrow/ArrowColumnarBatchSerDe.java  |  20 +-
 .../hive/ql/io/arrow/ArrowWrapperWritable.java     |  19 +
 .../apache/hadoop/hive/ql/io/arrow/Serializer.java | 865 +++++++++++++++------
 .../hive/ql/optimizer/physical/Vectorizer.java     |  60 +-
 .../ql/io/arrow/TestArrowColumnarBatchSerDe.java   |  53 ++
 .../ql/exec/vector/expressions/StringExpr.java     |  15 +
 17 files changed, 1268 insertions(+), 398 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 3ec99315a27..bf20a78b588 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2682,6 +2682,8 @@ public class HiveConf extends Configuration {
     // For Arrow SerDe
     HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", 
Long.MAX_VALUE,
         "Arrow root allocator memory size limitation in bytes."),
+    HIVE_ARROW_BATCH_ALLOCATOR_LIMIT("hive.arrow.batch.allocator.limit", 
10_000_000_000L,
+        "Max bytes per arrow batch. This is a threshold, the memory is not 
pre-allocated."),
     HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows 
sent in one Arrow batch."),
 
     // For Druid storage handler
@@ -3690,7 +3692,11 @@ public class HiveConf extends Configuration {
         "internal use only. When false, don't suppress fatal exceptions 
like\n" +
         "NullPointerException, etc so the query will fail and assure it will 
be noticed",
         true),
-
+    HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED(
+        "hive.vectorized.execution.filesink.arrow.native.enabled", true,
+        "This flag should be set to true to enable the native vectorization\n" 
+
+        "of queries using the Arrow SerDe and FileSink.\n" +
+        "The default value is true."),
     HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property 
has been extended to control "
         + "whether to check, convert, and normalize partition value to conform 
to its column type in "
         + "partition operations including but not limited to insert, such as 
alter, describe etc."),
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index 5cf765d8eb8..fbcd229d224 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -102,38 +102,20 @@ import org.apache.hadoop.mapred.InputFormat;
  * by sub-classes in a {@link org.junit.BeforeClass} initializer
  */
 public abstract class BaseJdbcWithMiniLlap {
-  private static MiniHS2 miniHS2 = null;
+
   private static String dataFileDir;
   private static Path kvDataFilePath;
   private static Path dataTypesFilePath;
 
-  private static HiveConf conf = null;
-  private static Connection hs2Conn = null;
+  protected static MiniHS2 miniHS2 = null;
+  protected static HiveConf conf = null;
+  protected static Connection hs2Conn = null;
 
   // This method should be called by sub-classes in a @BeforeClass initializer
-  public static MiniHS2 beforeTest(boolean useArrow) throws Exception {
+  public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception {
+    conf = inputConf;
     Class.forName(MiniHS2.getJdbcDriverName());
-
-    String confDir = "../../data/conf/llap/";
-    if (confDir != null && !confDir.isEmpty()) {
-      HiveConf.setHiveSiteLocation(new URL("file://"+ new 
File(confDir).toURI().getPath() + "/hive-site.xml"));
-      System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
-    }
-
-    conf = new HiveConf();
-    conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
-    conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
-    if(useArrow) {
-      conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
-    } else {
-      conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
-    }
-
-    conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
-        + "/tez-site.xml"));
-
     miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
-
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", 
"");
     kvDataFilePath = new Path(dataFileDir, "kv1.txt");
     dataTypesFilePath = new Path(dataFileDir, "datatypes.txt");
@@ -143,6 +125,19 @@ public abstract class BaseJdbcWithMiniLlap {
     return miniHS2;
   }
 
+  static HiveConf defaultConf() throws Exception {
+    String confDir = "../../data/conf/llap/";
+    if (confDir != null && !confDir.isEmpty()) {
+      HiveConf.setHiveSiteLocation(new URL("file://"+ new 
File(confDir).toURI().getPath() + "/hive-site.xml"));
+      System.out.println("Setting hive-site: " + 
HiveConf.getHiveSiteLocation());
+    }
+    HiveConf defaultConf = new HiveConf();
+    defaultConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+    defaultConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+    defaultConf.addResource(new URL("file://" + new 
File(confDir).toURI().getPath() + "/tez-site.xml"));
+    return defaultConf;
+  }
+
   @Before
   public void setUp() throws Exception {
     hs2Conn = getConnection(miniHS2.getJdbcURL(), 
System.getProperty("user.name"), "bar");
@@ -549,6 +544,8 @@ public abstract class BaseJdbcWithMiniLlap {
         rowProcessor.process(row);
         ++rowCount;
       }
+       //In arrow-mode this will throw exception unless all buffers have been 
released
+       //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader
       reader.close();
     }
     LlapBaseInputFormat.close(handleId);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index 0c6acd8495c..3dcc4928b1a 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hive.llap.Row;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -62,9 +64,10 @@ public class TestJdbcWithMiniLlapArrow extends 
BaseJdbcWithMiniLlap {
 
   @BeforeClass
   public static void beforeTest() throws Exception {
-    HiveConf conf = new HiveConf();
+    HiveConf conf = defaultConf();
+    conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
     MiniHS2.cleanupLocalDir();
-    miniHS2 = BaseJdbcWithMiniLlap.beforeTest(true);
+    miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf);
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", 
"");
 
     Connection conDefault = 
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
index 809068fe3e7..d954d0e2fe6 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java
@@ -25,6 +25,8 @@ import org.junit.BeforeClass;
 import org.junit.Before;
 import org.junit.After;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
 /**
  * TestJdbcWithMiniLlap for llap Row format.
@@ -33,7 +35,9 @@ public class TestJdbcWithMiniLlapRow extends 
BaseJdbcWithMiniLlap {
 
   @BeforeClass
   public static void beforeTest() throws Exception {
-    BaseJdbcWithMiniLlap.beforeTest(false);
+    HiveConf conf = defaultConf();
+    conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false);
+    BaseJdbcWithMiniLlap.beforeTest(conf);
   }
 
   @Override
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
similarity index 54%
copy from 
itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
copy to 
itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
index 0c6acd8495c..35eda6cb0a6 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java
@@ -18,69 +18,42 @@
 
 package org.apache.hive.jdbc;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertArrayEquals;
+
 import java.math.BigDecimal;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.hive.common.type.Date;
 import org.apache.hadoop.hive.common.type.Timestamp;
+
+import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.hive.llap.FieldDesc;
 import org.apache.hadoop.hive.llap.Row;
 import org.apache.hadoop.io.NullWritable;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 
-import org.junit.AfterClass;
-import org.junit.Test;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Connection;
-import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
-import org.apache.hive.jdbc.miniHS2.MiniHS2;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.junit.Test;
 
 /**
- * TestJdbcWithMiniLlap for Arrow format
+ * TestJdbcWithMiniLlap for Arrow format with vectorized output sink
  */
-@Ignore("unstable HIVE-23549")
-public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
+public class TestJdbcWithMiniLlapVectorArrow extends BaseJdbcWithMiniLlap {
 
-  private static MiniHS2 miniHS2 = null;
-  private static final String tableName = "testJdbcMinihs2Tbl";
-  private static String dataFileDir;
-  private static final String testDbName = "testJdbcMinihs2";
-
-  private static class ExceptionHolder {
-    Throwable throwable;
-  }
 
   @BeforeClass
   public static void beforeTest() throws Exception {
-    HiveConf conf = new HiveConf();
-    MiniHS2.cleanupLocalDir();
-    miniHS2 = BaseJdbcWithMiniLlap.beforeTest(true);
-    dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", 
"");
-
-    Connection conDefault = 
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
-            System.getProperty("user.name"), "bar");
-    Statement stmt = conDefault.createStatement();
-    stmt.execute("drop database if exists " + testDbName + " cascade");
-    stmt.execute("create database " + testDbName);
-    stmt.close();
-    conDefault.close();
-  }
-
-  @AfterClass
-  public static void afterTest() {
-    if (miniHS2 != null && miniHS2.isStarted()) {
-      miniHS2.stop();
-    }
+    HiveConf conf = defaultConf();
+    conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+    conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, 
true);
+    BaseJdbcWithMiniLlap.beforeTest(conf);
   }
 
   @Override
@@ -266,94 +239,172 @@ public class TestJdbcWithMiniLlapArrow extends 
BaseJdbcWithMiniLlap {
     assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]);
   }
 
-  /**
-   * SleepMsUDF
-   */
-  public static class SleepMsUDF extends UDF {
-    public Integer evaluate(int value, int ms) {
-      try {
-        Thread.sleep(ms);
-      } catch (InterruptedException e) {
-        // No-op
-      }
-      return value;
+
+  @Test
+  public void testTypesNestedInListWithLimitAndFilters() throws Exception {
+    try (Statement statement = hs2Conn.createStatement()) {
+      statement.execute("CREATE TABLE complex_tbl(c1 array<string>, " +
+          "c2 array<struct<f1:string,f2:string>>, " +
+          "c3 array<array<struct<f1:string,f2:string>>>, " +
+          "c4 int) STORED AS ORC");
+
+      statement.executeUpdate("INSERT INTO complex_tbl VALUES " +
+          "(" +
+          "ARRAY('a1', 'a2', 'a3', null), " +
+          "ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 
'f2','a4')), " +
+          "ARRAY((ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), 
NAMED_STRUCT('f1','a3', 'f2','a4')))), " +
+          "1),      " +
+          "(" +
+          "ARRAY('b1'), " +
+          "ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 
'f2','b4')), " +
+          "ARRAY((ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), 
NAMED_STRUCT('f1','b3', 'f2','b4'))), " +
+          "(ARRAY(NAMED_STRUCT('f1','b5', 'f2','b6'), NAMED_STRUCT('f1','b7', 
'f2','b8')))), " +
+          "2), " +
+          "(" +
+          "ARRAY('c1', 'c2'), ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), 
NAMED_STRUCT('f1','c3', 'f2','c4'), " +
+          "NAMED_STRUCT('f1','c5', 'f2','c6')), 
ARRAY((ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), " +
+          "NAMED_STRUCT('f1','c3', 'f2','c4'))), 
(ARRAY(NAMED_STRUCT('f1','c5', 'f2','c6'), " +
+          "NAMED_STRUCT('f1','c7', 'f2','c8'))), 
(ARRAY(NAMED_STRUCT('f1','c9', 'f2','c10'), " +
+          "NAMED_STRUCT('f1','c11', 'f2','c12')))), " +
+          "3), " +
+          "(" +
+          "ARRAY(null), " +
+          "ARRAY(NAMED_STRUCT('f1','d1', 'f2','d2'), NAMED_STRUCT('f1','d3', 
'f2','d4'), " +
+          "NAMED_STRUCT('f1','d5', 'f2','d6'), NAMED_STRUCT('f1','d7', 
'f2','d8')), " +
+          "ARRAY((ARRAY(NAMED_STRUCT('f1','d1', 'f2', 'd2')))), " +
+          "4)");
+
     }
+
+    List<Object[]> expected = new ArrayList<>();
+    expected.add(new Object[]{
+        asList("a1", "a2", "a3", null),
+        asList(asList("a1", "a2"), asList("a3", "a4")),
+        asList(asList(asList("a1", "a2"), asList("a3", "a4"))),
+        1
+    });
+    expected.add(new Object[]{
+        asList("b1"),
+        asList(asList("b1", "b2"), asList("b3", "b4")),
+        asList(asList(asList("b1", "b2"), asList("b3", "b4")), 
asList(asList("b5", "b6"), asList("b7", "b8"))),
+        2
+    });
+    expected.add(new Object[]{
+        asList("c1", "c2"),
+        asList(asList("c1", "c2"), asList("c3", "c4"), asList("c5", "c6")),
+        asList(asList(asList("c1", "c2"), asList("c3", "c4")), 
asList(asList("c5", "c6"), asList("c7", "c8")),
+            asList(asList("c9", "c10"), asList("c11", "c12"))),
+        3
+    });
+    List<String> nullList = new ArrayList<>();
+    nullList.add(null);
+    expected.add(new Object[]{
+        nullList,
+        asList(asList("d1", "d2"), asList("d3", "d4"), asList("d5", "d6"), 
asList("d7", "d8")),
+        asList(asList(asList("d1", "d2"))),
+        4
+    });
+
+    // test without limit and filters (i.e 
VectorizedRowBatch#selectedInUse=false)
+    RowCollector2 rowCollector = new RowCollector2();
+    String query = "select * from complex_tbl";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(0),
+        expected.get(1),
+        expected.get(2),
+        expected.get(3));
+
+    // test with filter
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl where c4 > 1 ";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(1), expected.get(2), 
expected.get(3));
+
+    // test with limit
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl limit 3";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(0), expected.get(1), 
expected.get(2));
+
+    // test with filters and limit
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl where c4 > 1 limit 2";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(1), expected.get(2));
+
   }
 
-  /**
-   * Test CLI kill command of a query that is running.
-   * We spawn 2 threads - one running the query and
-   * the other attempting to cancel.
-   * We're using a dummy udf to simulate a query,
-   * that runs for a sufficiently long time.
-   * @throws Exception
-   */
   @Test
-  public void testKillQuery() throws Exception {
-    Connection con = 
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
-            System.getProperty("user.name"), "bar");
-    Connection con2 = 
BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
-            System.getProperty("user.name"), "bar");
-
-    String udfName = SleepMsUDF.class.getName();
-    Statement stmt1 = con.createStatement();
-    final Statement stmt2 = con2.createStatement();
-    Path dataFilePath = new Path(dataFileDir, "kv1.txt");
-
-    String tblName = testDbName + "." + tableName;
-
-    stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
-    stmt1.execute("create table " + tblName + " (int_col int, value string) ");
-    stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' 
into table " + tblName);
-
-
-    stmt1.close();
-    final Statement stmt = con.createStatement();
-    final ExceptionHolder tExecuteHolder = new ExceptionHolder();
-    final ExceptionHolder tKillHolder = new ExceptionHolder();
-
-    // Thread executing the query
-    Thread tExecute = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          System.out.println("Executing query: ");
-          stmt.execute("set hive.llap.execution.mode = none");
-
-          // The test table has 500 rows, so total query time should be ~ 
500*500ms
-          stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, 
t2.int_col " +
-                  "from " + tableName + " t1 join " + tableName + " t2 on 
t1.int_col = t2.int_col");
-        } catch (SQLException e) {
-          tExecuteHolder.throwable = e;
-        }
-      }
+  public void testTypesNestedInMapWithLimitAndFilters() throws Exception {
+    try (Statement statement = hs2Conn.createStatement()) {
+      statement.execute("CREATE TABLE complex_tbl2(c1 map<int, string>," +
+          " c2 map<int, array<string>>, " +
+          " c3 map<int, struct<f1:string,f2:string>>, c4 int) STORED AS ORC");
+
+      statement.executeUpdate("INSERT INTO complex_tbl2 VALUES " +
+          "(MAP(1, 'a1'), MAP(1, ARRAY('a1', 'a2')), MAP(1, 
NAMED_STRUCT('f1','a1', 'f2','a2')), " +
+          "1), " +
+          "(MAP(1, 'b1',2, 'b2'), MAP(1, ARRAY('b1', 'b2'), 2, ARRAY('b3') ), 
" +
+          "MAP(1, NAMED_STRUCT('f1','b1', 'f2','b2')), " +
+          "2), " +
+          "(MAP(1, 'c1',2, 'c2'), MAP(1, ARRAY('c1', 'c2'), 2, ARRAY('c3') ), 
" +
+          "MAP(1, NAMED_STRUCT('f1','c1', 'f2','c2'), 2, NAMED_STRUCT('f1', 
'c3', 'f2', 'c4') ), " +
+          "3)");
+
+    }
+
+    List<Object[]> expected = new ArrayList<>();
+    expected.add(new Object[]{
+        ImmutableMap.of(1, "a1"),
+        ImmutableMap.of(1, asList("a1", "a2")),
+        ImmutableMap.of(1, asList("a1", "a2")),
+        1,
     });
-    // Thread killing the query
-    Thread tKill = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(5000);
-          String queryId = ((HiveStatement) stmt).getQueryId();
-          System.out.println("Killing query: " + queryId);
-          stmt2.execute("kill query '" + queryId + "'");
-          stmt2.close();
-        } catch (Exception e) {
-          tKillHolder.throwable = e;
-        }
-      }
+    expected.add(new Object[]{
+        ImmutableMap.of(1, "b1", 2, "b2"),
+        ImmutableMap.of(1, asList("b1", "b2"), 2, asList("b3")),
+        ImmutableMap.of(1, asList("b1", "b2")),
+        2,
+    });
+    expected.add(new Object[]{
+        ImmutableMap.of(1, "c1", 2, "c2"),
+        ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3")),
+        ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3", "c4")),
+        3,
     });
 
-    tExecute.start();
-    tKill.start();
-    tExecute.join();
-    tKill.join();
-    stmt.close();
-    con2.close();
-    con.close();
 
-    assertNotNull("tExecute", tExecuteHolder.throwable);
-    assertNull("tCancel", tKillHolder.throwable);
+    // test without limit and filters (i.e. 
VectorizedRowBatch#selectedInUse=false)
+    RowCollector2 rowCollector = new RowCollector2();
+    String query = "select * from complex_tbl2";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(0), expected.get(1), 
expected.get(2));
+
+    // test with filter
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl2 where c4 > 1 ";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(1), expected.get(2));
+
+    // test with limit
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl2 limit 2";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(0), expected.get(1));
+
+    // test with filters and limit
+    rowCollector = new RowCollector2();
+    query = "select * from complex_tbl2 where c4 > 1 limit 1";
+    processQuery(query, 1, rowCollector);
+    verifyResult(rowCollector.rows, expected.get(1));
+
+  }
+
+  private void verifyResult(List<Object[]> actual, Object[]... expected) {
+    assertEquals(expected.length, actual.size());
+    for (int i = 0; i < expected.length; i++) {
+      assertArrayEquals(expected[i], actual.get(i));
+    }
   }
 
 }
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
index d9c5666bc40..014e49dafef 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java
@@ -39,11 +39,19 @@ public class LlapArrowBatchRecordReader extends 
LlapBaseRecordReader<ArrowWrappe
   private BufferAllocator allocator;
   private ArrowStreamReader arrowStreamReader;
 
+  //Allows client to provide and manage their own arrow BufferAllocator
   public LlapArrowBatchRecordReader(InputStream in, Schema schema, 
Class<ArrowWrapperWritable> clazz,
-      JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) 
throws IOException {
+      JobConf job, Closeable client, Socket socket, BufferAllocator allocator) 
throws IOException {
     super(in, schema, clazz, job, client, socket);
-    allocator = 
RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit);
+    this.allocator = allocator;
     this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), 
allocator);
+  } 
+
+  //Use the global arrow BufferAllocator
+  public LlapArrowBatchRecordReader(InputStream in, Schema schema, 
Class<ArrowWrapperWritable> clazz,
+      JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) 
throws IOException {
+    this(in, schema, clazz, job, client, socket, 
+        
RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit));
   }
 
   @Override
@@ -76,6 +84,9 @@ public class LlapArrowBatchRecordReader extends 
LlapBaseRecordReader<ArrowWrappe
   @Override
   public void close() throws IOException {
     arrowStreamReader.close();
+    //allocator.close() will throw exception unless all buffers have been 
released
+    //See org.apache.arrow.memory.BaseAllocator.close()
+    allocator.close();
   }
 
 }
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
index fafbdee210a..7690599a80c 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java
@@ -25,16 +25,28 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import java.io.IOException;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory;
+import java.util.UUID;
 
 /*
  * Adapts an Arrow batch reader to a row reader
+ * Only used for testing
  */
 public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> 
{
 
   private LlapBaseInputFormat baseInputFormat;
 
   public LlapArrowRowInputFormat(long arrowAllocatorLimit) {
-    baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit);
+    BufferAllocator allocator = 
RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator(
+        //allocator name, use UUID for testing
+        UUID.randomUUID().toString(),
+        //No use for reservation, allocators claim memory from the same pool, 
+        //but allocate/releases are tracked per-allocator
+        0,
+        //Limit passed in by client
+        arrowAllocatorLimit);
+    baseInputFormat = new LlapBaseInputFormat(true, allocator);
   }
 
   @Override
diff --git 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 30f372003f0..46e6e024ba2 100644
--- 
a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ 
b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.InputSplitWithLocationInfo;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.Credentials;
@@ -107,6 +108,7 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
   private String query;
   private boolean useArrow;
   private long arrowAllocatorLimit;
+  private BufferAllocator allocator;
   private final Random rand = new Random();
 
   public static final String URL_KEY = "llap.if.hs2.connection";
@@ -128,11 +130,17 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     this.query = query;
   }
 
+  //Exposed only for testing, clients should use LlapBaseInputFormat(boolean, 
BufferAllocator instead)
   public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) {
     this.useArrow = useArrow;
     this.arrowAllocatorLimit = arrowAllocatorLimit;
   }
 
+  public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) {
+    this.useArrow = useArrow;
+    this.allocator = allocator;
+  }
+
   public LlapBaseInputFormat() {
     this.useArrow = false;
   }
@@ -209,10 +217,19 @@ public class LlapBaseInputFormat<V extends 
WritableComparable<?>>
     @SuppressWarnings("rawtypes")
     LlapBaseRecordReader recordReader;
     if(useArrow) {
-      recordReader = new LlapArrowBatchRecordReader(
-          socket.getInputStream(), llapSplit.getSchema(),
-          ArrowWrapperWritable.class, job, llapClient, socket,
-          arrowAllocatorLimit);
+      if(allocator != null) {
+        //Client provided their own allocator
+        recordReader = new LlapArrowBatchRecordReader(
+            socket.getInputStream(), llapSplit.getSchema(),
+            ArrowWrapperWritable.class, job, llapClient, socket,
+            allocator);
+      } else {
+        //Client did not provide their own allocator, use constructor for 
global allocator
+        recordReader = new LlapArrowBatchRecordReader(
+            socket.getInputStream(), llapSplit.getSchema(),
+            ArrowWrapperWritable.class, job, llapClient, socket,
+            arrowAllocatorLimit);
+      }
     } else {
       recordReader = new LlapBaseRecordReader(socket.getInputStream(),
           llapSplit.getSchema(), BytesWritable.class, job, llapClient, 
(java.io.Closeable)socket);
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java 
b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
index 1b3a3ebb269..4cd8a61c8f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java
@@ -20,11 +20,12 @@ package org.apache.hadoop.hive.llap;
 
 import java.io.IOException;
 
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.NonNullableStructVector;
 import org.apache.arrow.vector.ipc.ArrowStreamWriter;
 import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
 import org.apache.hadoop.io.Writable;
-import java.nio.channels.WritableByteChannel;
 import org.apache.hadoop.mapred.RecordWriter;
 import org.apache.hadoop.mapred.Reporter;
 import org.slf4j.Logger;
@@ -47,15 +48,28 @@ public class LlapArrowRecordWriter<K extends Writable, V 
extends Writable>
   public static final Logger LOG = 
LoggerFactory.getLogger(LlapArrowRecordWriter.class);
 
   ArrowStreamWriter arrowStreamWriter;
-  WritableByteChannel out;
+  WritableByteChannelAdapter out;
+  BufferAllocator allocator;
+  NonNullableStructVector rootVector;
 
-  public LlapArrowRecordWriter(WritableByteChannel out) {
+  public LlapArrowRecordWriter(WritableByteChannelAdapter out) {
     this.out = out;
   }
 
   @Override
   public void close(Reporter reporter) throws IOException {
-    arrowStreamWriter.close();
+    try {
+      arrowStreamWriter.close();
+    } finally {
+      rootVector.close();
+      //bytesLeaked should always be 0
+      long bytesLeaked = allocator.getAllocatedMemory();
+      if(bytesLeaked != 0) {
+        LOG.error("Arrow memory leaked bytes: {}", bytesLeaked);
+        throw new IllegalStateException("Arrow memory leaked bytes:" + 
bytesLeaked);
+      }
+      allocator.close();
+    }
   }
 
   @Override
@@ -64,6 +78,9 @@ public class LlapArrowRecordWriter<K extends Writable, V 
extends Writable>
     if (arrowStreamWriter == null) {
       VectorSchemaRoot vectorSchemaRoot = 
arrowWrapperWritable.getVectorSchemaRoot();
       arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out);
+      allocator = arrowWrapperWritable.getAllocator();
+      this.out.setAllocator(allocator);
+      rootVector = arrowWrapperWritable.getRootVector();
     }
     arrowStreamWriter.writeBatch();
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java 
b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
index 57da1d9f6d8..b07ce5b07c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java
@@ -18,13 +18,14 @@
 
 package org.apache.hadoop.hive.llap;
 
-import io.netty.buffer.Unpooled;
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.Semaphore;
+import org.apache.arrow.memory.BufferAllocator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ public class WritableByteChannelAdapter implements 
WritableByteChannel {
   private final Semaphore writeResources;
   private boolean closed = false;
   private final String id;
+  private BufferAllocator allocator;
 
   private ChannelFutureListener writeListener = new ChannelFutureListener() {
     @Override
@@ -82,12 +84,18 @@ public class WritableByteChannelAdapter implements 
WritableByteChannel {
     this.id = id;
   }
 
+  public void setAllocator(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
   @Override
   public int write(ByteBuffer src) throws IOException {
     int size = src.remaining();
     //Down the semaphore or block until available
     takeWriteResources(1);
-    chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener);
+    ByteBuf buf = allocator.buffer(size);
+    buf.writeBytes(src);
+    chc.writeAndFlush(buf).addListener(writeListener);
     return size;
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
new file mode 100644
index 00000000000..1603703ec7e
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector.filesink;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.CompilationOpContext;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TerminalOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.VectorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.hive.llap.LlapOutputFormatService;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import java.util.List;
+import java.util.ArrayList;
+import org.apache.hadoop.hive.ql.io.arrow.Serializer;
+import static org.apache.hadoop.hive.llap.LlapOutputFormat.LLAP_OF_ID_KEY;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.core.layout.AbstractStringLayout;
+
+/**
+ * Native Vectorized File Sink operator implementation for Arrow.
+ * Assumes output to LlapOutputFormatService
+ **/
+public class VectorFileSinkArrowOperator extends TerminalOperator<FileSinkDesc>
+    implements Serializable, VectorizationOperator {
+
+  private static final long serialVersionUID = 1L;
+
+  private VectorizationContext vContext;
+  private VectorFileSinkDesc vectorDesc;
+  public static final Logger LOG = 
LoggerFactory.getLogger(VectorFileSinkArrowOperator.class.getName());
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+  
+  private transient Serializer converter;
+  private transient RecordWriter recordWriter;
+  private transient boolean wroteData;
+  private transient String attemptId;
+
+  public VectorFileSinkArrowOperator(CompilationOpContext ctx, OperatorDesc 
conf,
+      VectorizationContext vContext, VectorDesc vectorDesc) {
+    this(ctx);
+    this.conf = (FileSinkDesc) conf;
+    this.vContext = vContext;
+    this.vectorDesc = (VectorFileSinkDesc) vectorDesc;
+  }
+
+  /** Kryo ctor. */
+  @VisibleForTesting
+  public VectorFileSinkArrowOperator() {
+    super();
+  }
+
+  public VectorFileSinkArrowOperator(CompilationOpContext ctx) {
+    super(ctx);
+  }
+
+  @Override
+  public VectorizationContext getInputVectorizationContext() {
+    return vContext;
+  }
+
+  @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    super.initializeOp(hconf);
+    //attemptId identifies a RecordWriter initialized by 
LlapOutputFormatService
+    this.attemptId = hconf.get(LLAP_OF_ID_KEY);
+    try {
+      //Initialize column names and types
+      List<TypeInfo> typeInfos = new ArrayList<>();
+      List<String> fieldNames = new ArrayList<>();
+      StructObjectInspector schema = (StructObjectInspector) 
inputObjInspectors[0];
+      for(int i = 0; i < schema.getAllStructFieldRefs().size(); i++) {
+        StructField structField = schema.getAllStructFieldRefs().get(i);
+        fieldNames.add(structField.getFieldName());
+        TypeInfo typeInfo = 
TypeInfoUtils.getTypeInfoFromObjectInspector(structField.getFieldObjectInspector());
+        typeInfos.add(typeInfo);
+      }
+      //Initialize an Arrow serializer
+      converter = new Serializer(hconf, attemptId, typeInfos, fieldNames);
+    } catch (Exception e) {
+      LOG.error("Unable to initialize VectorFileSinkArrowOperator");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void process(Object data, int tag) throws HiveException {
+    //ArrowStreamReader expects at least the schema metadata, if this op 
writes no data,
+    //we need to send the schema to close the stream gracefully
+    VectorizedRowBatch batch = (VectorizedRowBatch) data;
+    try {
+      if(recordWriter == null) {
+        recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId);
+      }
+      //Convert the VectorizedRowBatch to a handle for the Arrow batch
+      ArrowWrapperWritable writable = converter.serializeBatch(batch, true);
+      //Pass the handle to the LlapOutputFormatService recordWriter
+      recordWriter.write(null, writable);
+      this.wroteData = true;
+    } catch(Exception e) {
+      LOG.error("Failed to convert VectorizedRowBatch to Arrow batch");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void closeOp(boolean abort) throws HiveException {
+    try {
+      if(!wroteData) {
+        //Send a schema only batch to signal EOS with no data written
+        ArrowWrapperWritable writable = converter.emptyBatch();
+        if(recordWriter == null) {
+          recordWriter = 
LlapOutputFormatService.get().getWriter(this.attemptId);
+        }
+        recordWriter.write(null, writable);
+      }
+    } catch(Exception e) {
+      LOG.error("Failed to write Arrow stream schema");
+      throw new RuntimeException(e);
+    } finally {
+      try {
+        //Close the recordWriter with null Reporter
+        recordWriter.close(null);
+      } catch(Exception e) {
+        LOG.error("Failed to close Arrow stream");
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public VectorDesc getVectorDesc() {
+    return vectorDesc;
+  }
+
+  @Override
+  public OperatorType getType() {
+    return OperatorType.FILESINK;
+  }
+}
+
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
index a1a3327814c..dac7d9c18a3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java
@@ -104,7 +104,6 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
   public void initialize(Configuration conf, Properties tbl) throws 
SerDeException {
     this.conf = conf;
 
-    rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf);
 
     final String columnNameProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMNS);
     final String columnTypeProperty = 
tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
@@ -134,8 +133,6 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe {
       fields.add(toField(columnNames.get(i), columnTypes.get(i)));
     }
 
-    serializer = new Serializer(this);
-    deserializer = new Deserializer(this);
   }
 
   private static Field toField(String name, TypeInfo typeInfo) {
@@ -257,6 +254,15 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe 
{
 
   @Override
   public ArrowWrapperWritable serialize(Object obj, ObjectInspector 
objInspector) {
+    if(serializer == null) {
+      try {
+        rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf);
+        serializer = new Serializer(this);
+      } catch(Exception e) {
+        LOG.error("Unable to initialize serializer for 
ArrowColumnarBatchSerDe");
+        throw new RuntimeException(e);
+      }
+    }
     return serializer.serialize(obj, objInspector);
   }
 
@@ -267,6 +273,14 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe 
{
 
   @Override
   public Object deserialize(Writable writable) {
+    if(deserializer == null) {
+      try { 
+        rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf);
+        deserializer = new Deserializer(this);
+      } catch(Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
     return deserializer.deserialize(writable);
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
index dd490b1b909..53bee6b823f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hive.ql.io.arrow;
 
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.arrow.vector.complex.NonNullableStructVector;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -27,10 +29,19 @@ import java.io.IOException;
 
 public class ArrowWrapperWritable implements WritableComparable {
   private VectorSchemaRoot vectorSchemaRoot;
+  private BufferAllocator allocator;
+  private NonNullableStructVector rootVector;
 
   public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) {
     this.vectorSchemaRoot = vectorSchemaRoot;
   }
+
+  public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot, 
BufferAllocator allocator, NonNullableStructVector rootVector) {
+    this.vectorSchemaRoot = vectorSchemaRoot;
+    this.allocator = allocator;
+    this.rootVector = rootVector;
+  }
+
   public ArrowWrapperWritable() {}
 
   public VectorSchemaRoot getVectorSchemaRoot() {
@@ -41,6 +52,14 @@ public class ArrowWrapperWritable implements 
WritableComparable {
     this.vectorSchemaRoot = vectorSchemaRoot;
   }
 
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
+  public NonNullableStructVector getRootVector() {
+    return rootVector;
+  }
+
   @Override
   public void write(DataOutput dataOutput) throws IOException {
     throw new UnsupportedOperationException();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
index 7a432ac836b..5289b7c1efd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java
@@ -38,30 +38,37 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.NonNullableStructVector;
 import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.holders.DecimalHolder;
 import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
@@ -69,11 +76,15 @@ import 
org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.arrow.memory.BufferAllocator;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_ALLOCATOR_LIMIT;
 import static 
org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector;
 import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS;
 import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND;
@@ -85,31 +96,61 @@ import static 
org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStruc
 import static 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE;
 import static 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector;
 
-class Serializer {
+public class Serializer {
   private final int MAX_BUFFERED_ROWS;
-
-  // Schema
-  private final StructTypeInfo structTypeInfo;
-  private final int fieldSize;
+  private final static byte[] EMPTY_BYTES = new byte[0];
 
   // Hive columns
   private final VectorizedRowBatch vectorizedRowBatch;
   private final VectorAssignRow vectorAssignRow;
   private int batchSize;
+  private BufferAllocator allocator;
+  private List<TypeInfo> fieldTypeInfos;
+  private List<String> fieldNames;
+  private int fieldSize;
 
   private final StructVector rootVector;
+  private final DecimalHolder decimalHolder = new DecimalHolder();
+
+  //Constructor for non-serde serialization
+  public Serializer(Configuration conf, String attemptId, List<TypeInfo> 
typeInfos, List<String> fieldNames) {
+    this.fieldTypeInfos = typeInfos;
+    this.fieldNames = fieldNames;
+    long childAllocatorLimit = HiveConf.getLongVar(conf, 
HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
+    //Use per-task allocator for accounting only, no need to reserve per-task 
memory
+    long childAllocatorReservation = 0L;
+    //Break out accounting of direct memory per-task, so we can check no 
memory is leaked when task is completed
+    allocator = 
RootAllocatorFactory.INSTANCE.getRootAllocator(conf).newChildAllocator(
+        attemptId,
+        childAllocatorReservation,
+        childAllocatorLimit);
+    rootVector = StructVector.empty(null, allocator);
+    //These last fields are unused in non-serde usage
+    vectorizedRowBatch = null;
+    vectorAssignRow = null;
+    MAX_BUFFERED_ROWS = 0;
+  }
 
   Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException {
     MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE);
+    long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, 
HIVE_ARROW_BATCH_ALLOCATOR_LIMIT);
     ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of 
buffered columns: " + MAX_BUFFERED_ROWS);
+    String childAllocatorName = Thread.currentThread().getName();
+    //Use per-task allocator for accounting only, no need to reserve per-task 
memory
+    long childAllocatorReservation = 0L;
+    //Break out accounting of direct memory per-task, so we can check no 
memory is leaked when task is completed
+    allocator = serDe.rootAllocator.newChildAllocator(
+        childAllocatorName,
+        childAllocatorReservation,
+        childAllocatorLimit);
 
     // Schema
-    structTypeInfo = (StructTypeInfo) 
getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
-    List<TypeInfo> fieldTypeInfos = 
structTypeInfo.getAllStructFieldTypeInfos();
+    StructTypeInfo structTypeInfo = (StructTypeInfo) 
getTypeInfoFromObjectInspector(serDe.rowObjectInspector);
+    fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos();
+    fieldNames = structTypeInfo.getAllStructFieldNames();
     fieldSize = fieldTypeInfos.size();
-
     // Init Arrow stuffs
-    rootVector = StructVector.empty(null, serDe.rootAllocator);
+    rootVector = StructVector.empty(null, allocator);
 
     // Init Hive stuffs
     vectorizedRowBatch = new VectorizedRowBatch(fieldSize);
@@ -127,33 +168,66 @@ class Serializer {
     }
   }
 
-  private ArrowWrapperWritable serializeBatch() {
+  //Construct an emptyBatch which contains schema-only info
+  public ArrowWrapperWritable emptyBatch() {
+    rootVector.setValueCount(0);
+    for (int fieldIndex = 0; fieldIndex < fieldTypeInfos.size(); fieldIndex++) 
{
+      final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+      final String fieldName = fieldNames.get(fieldIndex);
+      final FieldType fieldType = toFieldType(fieldTypeInfo);
+      final FieldVector arrowVector = rootVector.addOrGet(fieldName, 
fieldType, FieldVector.class);
+      arrowVector.setInitialCapacity(0);
+      arrowVector.allocateNew();
+    }
+    VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
+    return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
+  }
+
+  //Used for both:
+  //1. VectorizedRowBatch constructed by batching rows
+  //2. VectorizedRowBatch provided from upstream (isNative)
+  public ArrowWrapperWritable serializeBatch(VectorizedRowBatch 
vectorizedRowBatch, boolean isNative) {
     rootVector.setValueCount(0);
 
     for (int fieldIndex = 0; fieldIndex < vectorizedRowBatch.projectionSize; 
fieldIndex++) {
       final int projectedColumn = 
vectorizedRowBatch.projectedColumns[fieldIndex];
       final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn];
-      final TypeInfo fieldTypeInfo = 
structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex);
-      final String fieldName = 
structTypeInfo.getAllStructFieldNames().get(fieldIndex);
+      final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex);
+      final String fieldName = fieldNames.get(fieldIndex);
       final FieldType fieldType = toFieldType(fieldTypeInfo);
+      //Reuse existing FieldVector buffers
+      //since we always call setValue or setNull for each row
+      boolean fieldExists = false;
+      if(rootVector.getChild(fieldName) != null) {
+        fieldExists = true;
+      }
       final FieldVector arrowVector = rootVector.addOrGet(fieldName, 
fieldType, FieldVector.class);
-      arrowVector.setInitialCapacity(batchSize);
-      arrowVector.allocateNew();
-      write(arrowVector, hiveVector, fieldTypeInfo, batchSize);
+      if(fieldExists) {
+        arrowVector.setValueCount(isNative ? vectorizedRowBatch.size : 
batchSize);
+      } else {
+        arrowVector.setInitialCapacity(isNative ? vectorizedRowBatch.size : 
batchSize);
+        arrowVector.allocateNew();
+      }
+      write(arrowVector, hiveVector, fieldTypeInfo, isNative ? 
vectorizedRowBatch.size : batchSize, vectorizedRowBatch, isNative);
+    }
+    if(!isNative) {
+      //Only mutate batches that are constructed by this serde
+      vectorizedRowBatch.reset();
+      rootVector.setValueCount(batchSize);
+    } else {
+      rootVector.setValueCount(vectorizedRowBatch.size);
     }
-    vectorizedRowBatch.reset();
-    rootVector.setValueCount(batchSize);
 
     batchSize = 0;
     VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector);
-    return new ArrowWrapperWritable(vectorSchemaRoot);
+    return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector);
   }
 
-  private FieldType toFieldType(TypeInfo typeInfo) {
+  private static FieldType toFieldType(TypeInfo typeInfo) {
     return new FieldType(true, toArrowType(typeInfo), null);
   }
 
-  private ArrowType toArrowType(TypeInfo typeInfo) {
+  private static ArrowType toArrowType(TypeInfo typeInfo) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE:
         switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) {
@@ -207,38 +281,43 @@ class Serializer {
     }
   }
 
-  private void write(FieldVector arrowVector, ColumnVector hiveVector, 
TypeInfo typeInfo, int size) {
+  private void write(FieldVector arrowVector, ColumnVector hiveVector, 
TypeInfo typeInfo, int size,
+      VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE:
-        writePrimitive(arrowVector, hiveVector, typeInfo, size);
+        writePrimitive(arrowVector, hiveVector, typeInfo, size, 
vectorizedRowBatch, isNative);
         break;
       case LIST:
-        writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, 
(ListTypeInfo) typeInfo, size);
+        writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, 
(ListTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
         break;
       case STRUCT:
-        writeStruct((NonNullableStructVector) arrowVector, 
(StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size);
+        writeStruct((NonNullableStructVector) arrowVector, 
(StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size, 
vectorizedRowBatch, isNative);
         break;
       case UNION:
-        writeUnion(arrowVector, hiveVector, typeInfo, size);
+        writeUnion(arrowVector, hiveVector, typeInfo, size, 
vectorizedRowBatch, isNative);
         break;
       case MAP:
-        writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, 
(MapTypeInfo) typeInfo, size);
+        writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, 
(MapTypeInfo) typeInfo, size, vectorizedRowBatch, isNative);
         break;
       default:
         throw new IllegalArgumentException();
-    }
+      }
   }
 
   private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, 
MapTypeInfo typeInfo,
-      int size) {
+      int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo);
     final ListColumnVector structListVector = toStructListVector(hiveVector);
 
-    write(arrowVector, structListVector, structListTypeInfo, size);
+    write(arrowVector, structListVector, structListTypeInfo, size, 
vectorizedRowBatch, isNative);
 
     final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
     for (int rowIndex = 0; rowIndex < size; rowIndex++) {
-      if (hiveVector.isNull[rowIndex]) {
+      int selectedIndex = rowIndex;
+      if (vectorizedRowBatch.selectedInUse) {
+        selectedIndex = vectorizedRowBatch.selected[rowIndex];
+      }
+      if (hiveVector.isNull[selectedIndex]) {
         BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0);
       } else {
         BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex);
@@ -247,7 +326,7 @@ class Serializer {
   }
 
   private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, 
TypeInfo typeInfo,
-      int size) {
+      int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
     final List<TypeInfo> objectTypeInfos = 
unionTypeInfo.getAllUnionObjectTypeInfos();
     final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector;
@@ -257,11 +336,11 @@ class Serializer {
     final ColumnVector hiveObjectVector = hiveObjectVectors[tag];
     final TypeInfo objectTypeInfo = objectTypeInfos.get(tag);
 
-    write(arrowVector, hiveObjectVector, objectTypeInfo, size);
+    write(arrowVector, hiveObjectVector, objectTypeInfo, size, 
vectorizedRowBatch, isNative);
   }
 
   private void writeStruct(NonNullableStructVector arrowVector, 
StructColumnVector hiveVector,
-      StructTypeInfo typeInfo, int size) {
+      StructTypeInfo typeInfo, int size, VectorizedRowBatch 
vectorizedRowBatch, boolean isNative) {
     final List<String> fieldNames = typeInfo.getAllStructFieldNames();
     final List<TypeInfo> fieldTypeInfos = 
typeInfo.getAllStructFieldTypeInfos();
     final ColumnVector[] hiveFieldVectors = hiveVector.fields;
@@ -291,7 +370,7 @@ class Serializer {
               toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class);
       arrowFieldVector.setInitialCapacity(size);
       arrowFieldVector.allocateNew();
-      write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size);
+      write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size, 
vectorizedRowBatch, isNative);
     }
 
     final ArrowBuf validityBuffer = arrowVector.getValidityBuffer();
@@ -304,235 +383,332 @@ class Serializer {
     }
   }
 
-  private void writeList(ListVector arrowVector, ListColumnVector hiveVector, 
ListTypeInfo typeInfo,
-      int size) {
+  // selected[] points to the valid/filtered/selected records at row level.
+  // for MultiValuedColumnVector such as ListColumnVector one record of vector 
points to multiple nested records.
+  // In child vectors we get these records in exploded manner i.e. the number 
of records in child vectors can have size more
+  // than actual the VectorizedRowBatch, consequently selected[] also needs to 
be readjusted.
+  // This method creates a shallow copy of VectorizedRowBatch with corrected 
size and selected[]
+
+  private static VectorizedRowBatch correctSelectedAndSize(VectorizedRowBatch 
sourceVrb,
+                                                           
MultiValuedColumnVector multiValuedColumnVector) {
+
+    VectorizedRowBatch vrb = new VectorizedRowBatch(sourceVrb.numCols, 
sourceVrb.size);
+    vrb.cols = sourceVrb.cols;
+    vrb.endOfFile = sourceVrb.endOfFile;
+    vrb.projectedColumns = sourceVrb.projectedColumns;
+    vrb.projectionSize = sourceVrb.projectionSize;
+    vrb.selectedInUse = sourceVrb.selectedInUse;
+    vrb.setPartitionInfo(sourceVrb.getDataColumnCount(), 
sourceVrb.getPartitionColumnCount());
+
+    int correctedSize = 0;
+    final int[] srcVrbSelected = sourceVrb.selected;
+    for (int i = 0; i < sourceVrb.size; i++) {
+      correctedSize += multiValuedColumnVector.lengths[srcVrbSelected[i]];
+    }
+
+    int newIndex = 0;
+    final int[] selectedOffsetsCorrected = new int[correctedSize];
+    for (int i = 0; i < sourceVrb.size; i++) {
+      long elementIndex = multiValuedColumnVector.offsets[srcVrbSelected[i]];
+      long elementSize = multiValuedColumnVector.lengths[srcVrbSelected[i]];
+      for (int j = 0; j < elementSize; j++) {
+        selectedOffsetsCorrected[newIndex++] = (int) (elementIndex + j);
+      }
+    }
+    vrb.selected = selectedOffsetsCorrected;
+    vrb.size = correctedSize;
+    return vrb;
+  }
+
+  private void writeList(ListVector arrowVector, ListColumnVector hiveVector, 
ListTypeInfo typeInfo, int size,
+      VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final int OFFSET_WIDTH = 4;
     final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo();
     final ColumnVector hiveElementVector = hiveVector.child;
     final FieldVector arrowElementVector =
-        (FieldVector) 
arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
-    arrowElementVector.setInitialCapacity(hiveVector.childCount);
+            (FieldVector) 
arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector();
+
+    VectorizedRowBatch correctedVrb = vectorizedRowBatch;
+    int correctedSize = hiveVector.childCount;
+    if (vectorizedRowBatch.selectedInUse) {
+      correctedVrb = correctSelectedAndSize(vectorizedRowBatch, hiveVector);
+      correctedSize = correctedVrb.size;
+    }
+    arrowElementVector.setInitialCapacity(correctedSize);
     arrowElementVector.allocateNew();
 
-    write(arrowElementVector, hiveElementVector, elementTypeInfo, 
hiveVector.childCount);
+    write(arrowElementVector, hiveElementVector, elementTypeInfo, 
correctedSize, correctedVrb, isNative);
 
     final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer();
     int nextOffset = 0;
 
     for (int rowIndex = 0; rowIndex < size; rowIndex++) {
-      if (hiveVector.isNull[rowIndex]) {
+      int selectedIndex = rowIndex;
+      if (vectorizedRowBatch.selectedInUse) {
+        selectedIndex = vectorizedRowBatch.selected[rowIndex];
+      }
+      if (hiveVector.isNull[selectedIndex]) {
         offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
       } else {
         offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset);
-        nextOffset += (int) hiveVector.lengths[rowIndex];
+        nextOffset += (int) hiveVector.lengths[selectedIndex];
         arrowVector.setNotNull(rowIndex);
       }
     }
     offsetBuffer.setInt(size * OFFSET_WIDTH, nextOffset);
   }
 
-  private void writePrimitive(FieldVector arrowVector, ColumnVector 
hiveVector, TypeInfo typeInfo,
-      int size) {
+  //Handle cases for both internally constructed
+  //and externally provided (isNative) VectorRowBatch
+  private void writePrimitive(FieldVector arrowVector, ColumnVector 
hiveVector, TypeInfo typeInfo, int size,
+      VectorizedRowBatch vectorizedRowBatch, boolean isNative) {
     final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory =
         ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory();
     switch (primitiveCategory) {
-      case BOOLEAN:
-        {
-          final BitVector bitVector = (BitVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              bitVector.setNull(i);
-            } else {
-              bitVector.set(i, (int) ((LongColumnVector) 
hiveVector).vector[i]);
-            }
-          }
+    case BOOLEAN:
+    {
+      if(isNative) {
+      writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, boolNullSetter, 
boolValueSetter, typeInfo);
+        return;
+      }
+      final BitVector bitVector = (BitVector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          boolNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          boolValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case BYTE:
-        {
-          final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              tinyIntVector.setNull(i);
-            } else {
-              tinyIntVector.set(i, (byte) ((LongColumnVector) 
hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case BYTE:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, byteNullSetter, 
byteValueSetter, typeInfo);
+        return;
+      }
+      final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          byteNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          byteValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case SHORT:
-        {
-          final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              smallIntVector.setNull(i);
-            } else {
-              smallIntVector.set(i, (short) ((LongColumnVector) 
hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case SHORT:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, shortNullSetter, 
shortValueSetter, typeInfo);
+        return;
+      }
+      final SmallIntVector smallIntVector = (SmallIntVector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          shortNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          shortValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case INT:
-        {
-          final IntVector intVector = (IntVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              intVector.setNull(i);
-            } else {
-              intVector.set(i, (int) ((LongColumnVector) 
hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case INT:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intNullSetter, 
intValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          intNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          intValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case LONG:
-        {
-          final BigIntVector bigIntVector = (BigIntVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              bigIntVector.setNull(i);
-            } else {
-              bigIntVector.set(i, ((LongColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case LONG:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, longNullSetter, 
longValueSetter, typeInfo);
+        return;
+      }
+      final BigIntVector bigIntVector = (BigIntVector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          longNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          longValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case FLOAT:
-        {
-          final Float4Vector float4Vector = (Float4Vector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              float4Vector.setNull(i);
-            } else {
-              float4Vector.set(i, (float) ((DoubleColumnVector) 
hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case FLOAT:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, floatNullSetter, 
floatValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          floatNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          floatValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case DOUBLE:
-        {
-          final Float8Vector float8Vector = (Float8Vector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              float8Vector.setNull(i);
-            } else {
-              float8Vector.set(i, ((DoubleColumnVector) hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case DOUBLE:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, 
doubleNullSetter, doubleValueSetter, typeInfo);
+        return;
+      }
+      final Float8Vector float8Vector = (Float8Vector) arrowVector;
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          doubleNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          doubleValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case STRING:
-      case VARCHAR:
-      case CHAR:
-        {
-          final VarCharVector varCharVector = (VarCharVector) arrowVector;
-          final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              varCharVector.setNull(i);
-            } else {
-              varCharVector.setSafe(i, bytesVector.vector[i], 
bytesVector.start[i], bytesVector.length[i]);
-            }
-          }
+      }
+    }
+    break;
+    case CHAR:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, charNullSetter, 
charValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          charNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          charValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case DATE:
-        {
-          final DateDayVector dateDayVector = (DateDayVector) arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              dateDayVector.setNull(i);
-            } else {
-              dateDayVector.set(i, (int) ((LongColumnVector) 
hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case STRING:
+    case VARCHAR:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, 
stringNullSetter, stringValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          stringNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          stringValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case TIMESTAMP:
-        {
-          final TimeStampMicroTZVector timeStampMicroTZVector = 
(TimeStampMicroTZVector) arrowVector;
-          final TimestampColumnVector timestampColumnVector = 
(TimestampColumnVector) hiveVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              timeStampMicroTZVector.setNull(i);
-            } else {
-              // Time = second + sub-second
-              final long secondInMillis = timestampColumnVector.getTime(i);
-              final long secondInMicros = (secondInMillis - secondInMillis % 
MILLIS_PER_SECOND) * MICROS_PER_MILLIS;
-              final long subSecondInMicros = timestampColumnVector.getNanos(i) 
/ NS_PER_MICROS;
-
-              if ((secondInMillis > 0 && secondInMicros < 0) || 
(secondInMillis < 0 && secondInMicros > 0)) {
-                // If the timestamp cannot be represented in long microsecond, 
set it as a null value
-                timeStampMicroTZVector.setNull(i);
-              } else {
-                timeStampMicroTZVector.set(i, secondInMicros + 
subSecondInMicros);
-              }
-            }
-          }
+      }
+    }
+    break;
+    case DATE:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, dateNullSetter, 
dateValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          dateNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          dateValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case BINARY:
-        {
-          final VarBinaryVector varBinaryVector = (VarBinaryVector) 
arrowVector;
-          final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              varBinaryVector.setNull(i);
-            } else {
-              varBinaryVector.setSafe(i, bytesVector.vector[i], 
bytesVector.start[i], bytesVector.length[i]);
-            }
-          }
+      }
+    }
+    break;
+    case TIMESTAMP:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, 
timestampNullSetter, timestampValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          timestampNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          timestampValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case DECIMAL:
-        {
-          final DecimalVector decimalVector = (DecimalVector) arrowVector;
-          final int scale = decimalVector.getScale();
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              decimalVector.setNull(i);
-            } else {
-              decimalVector.set(i,
-                  ((DecimalColumnVector) 
hiveVector).vector[i].getHiveDecimal().bigDecimalValue().setScale(scale));
-            }
-          }
+      }
+    }
+    break;
+    case BINARY:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, 
binaryNullSetter, binaryValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          binaryNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          binaryValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
         }
-        break;
-      case INTERVAL_YEAR_MONTH:
-        {
-          final IntervalYearVector intervalYearVector = (IntervalYearVector) 
arrowVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              intervalYearVector.setNull(i);
-            } else {
-              intervalYearVector.set(i, (int) ((LongColumnVector) 
hiveVector).vector[i]);
-            }
-          }
+      }
+    }
+    break;
+    case DECIMAL:
+    {
+      if(isNative) {
+        if(hiveVector instanceof DecimalColumnVector) {
+          writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, 
decimalNullSetter, decimalValueSetter, typeInfo);
+        } else {
+          writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, 
decimalNullSetter, decimal64ValueSetter, typeInfo);
         }
-        break;
-      case INTERVAL_DAY_TIME:
-        {
-          final IntervalDayVector intervalDayVector = (IntervalDayVector) 
arrowVector;
-          final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
-              (IntervalDayTimeColumnVector) hiveVector;
-          for (int i = 0; i < size; i++) {
-            if (hiveVector.isNull[i]) {
-              intervalDayVector.setNull(i);
-            } else {
-              final long totalSeconds = 
intervalDayTimeColumnVector.getTotalSeconds(i);
-              final long days = totalSeconds / SECOND_PER_DAY;
-              final long millis =
-                  (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND +
-                      intervalDayTimeColumnVector.getNanos(i) / NS_PER_MILLIS;
-              intervalDayVector.set(i, (int) days, (int) millis);
-            }
-          }
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          decimalNullSetter.accept(i, arrowVector, hiveVector);
+        } else if(hiveVector instanceof DecimalColumnVector) {
+          decimalValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
+        } else if(hiveVector instanceof Decimal64ColumnVector) {
+          decimal64ValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo);
+        } else {
+          throw new IllegalArgumentException("Unsupported vector column type: 
" + hiveVector.getClass().getName());
         }
-        break;
-      case VOID:
-      case UNKNOWN:
-      case TIMESTAMPLOCALTZ:
-      default:
-        throw new IllegalArgumentException();
+      }
+    }
+    break;
+    case INTERVAL_YEAR_MONTH:
+    {
+      if(isNative) {
+       writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, 
intervalYearMonthNullSetter, intervalYearMonthValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          intervalYearMonthNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          intervalYearMonthValueSetter.accept(i, i, arrowVector, hiveVector, 
typeInfo);
+        }
+      }
+    }
+    break;
+    case INTERVAL_DAY_TIME:
+    {
+      if(isNative) {
+        writeGeneric(arrowVector, hiveVector, size, 
vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, 
intervalDayTimeNullSetter, intervalDayTimeValueSetter, typeInfo);
+        return;
+      }
+      for (int i = 0; i < size; i++) {
+        if (hiveVector.isNull[i]) {
+          intervalDayTimeNullSetter.accept(i, arrowVector, hiveVector);
+        } else {
+          intervalDayTimeValueSetter.accept(i, i, arrowVector, hiveVector, 
typeInfo);
+        }
+      }
+    }
+    break;
+    case VOID:
+    case UNKNOWN:
+    case TIMESTAMPLOCALTZ:
+    default:
+      throw new IllegalArgumentException();
     }
   }
 
@@ -540,7 +716,7 @@ class Serializer {
     // if row is null, it means there are no more rows (closeOp()).
     // another case can be that the buffer is full.
     if (obj == null) {
-      return serializeBatch();
+      return serializeBatch(vectorizedRowBatch, false);
     }
     List<Object> standardObjects = new ArrayList<Object>();
     ObjectInspectorUtils.copyToStandardObject(standardObjects, obj,
@@ -549,8 +725,241 @@ class Serializer {
     vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, 
fieldSize);
     batchSize++;
     if (batchSize == MAX_BUFFERED_ROWS) {
-      return serializeBatch();
+      return serializeBatch(vectorizedRowBatch, false);
     }
     return null;
   }
+
+ //Use a provided nullSetter and valueSetter function to populate
+ //fieldVector from hiveVector
+ private static void writeGeneric(final FieldVector fieldVector, final 
ColumnVector hiveVector, final int size, final boolean selectedInUse, final 
int[] selected, final IntAndVectorsConsumer nullSetter, final 
IntIntAndVectorsConsumer valueSetter, TypeInfo typeInfo)
+  {
+     final boolean[] inputIsNull = hiveVector.isNull;
+     final int[] sel = selected;
+
+     if (hiveVector.isRepeating) {
+       if (hiveVector.noNulls || !inputIsNull[0]) {
+         for(int i = 0; i < size; i++) {
+           //Fill n rows with value in row 0
+           valueSetter.accept(i, 0, fieldVector, hiveVector, typeInfo);
+         }
+       } else {
+         for(int i = 0; i < size; i++) {
+           //Fill n rows with NULL
+           nullSetter.accept(i, fieldVector, hiveVector);
+         }
+       }
+       return;
+     }
+
+     if (hiveVector.noNulls) {
+       if (selectedInUse) {
+         for(int logical = 0; logical < size; logical++) {
+           final int batchIndex = sel[logical];
+           //Add row batchIndex
+           valueSetter.accept(logical, batchIndex, fieldVector, hiveVector, 
typeInfo);
+         }
+       } else {
+         for(int batchIndex = 0; batchIndex < size; batchIndex++) {
+           //Add row batchIndex
+           valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector, 
typeInfo);
+         }
+       }
+     } else {
+       if (selectedInUse) {
+         for(int logical = 0; logical < size; logical++) {
+           final int batchIndex = sel[logical];
+           if (inputIsNull[batchIndex]) {
+             //Add NULL
+             nullSetter.accept(batchIndex, fieldVector, hiveVector);
+           } else {
+             //Add row batchIndex
+             valueSetter.accept(logical, batchIndex, fieldVector, hiveVector, 
typeInfo);
+          }
+        }
+       } else {
+         for(int batchIndex = 0; batchIndex < size; batchIndex++) {
+           if (inputIsNull[batchIndex]) {
+             //Add NULL
+             nullSetter.accept(batchIndex, fieldVector, hiveVector);
+           } else {
+             //Add row batchIndex
+             valueSetter.accept(batchIndex, batchIndex, fieldVector, 
hiveVector, typeInfo);
+         }
+       }
+     }
+   }
+  }
+
+  //nullSetters and valueSetter for each type
+
+  //bool
+  private static final IntAndVectorsConsumer boolNullSetter = (i, arrowVector, 
hiveVector)
+      -> ((BitVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer boolValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> ((BitVector) arrowVector).set(i, (int) ((LongColumnVector) 
hiveVector).vector[j]);
+
+  //byte
+  private static final IntAndVectorsConsumer byteNullSetter = (i, arrowVector, 
hiveVector)
+      -> ((TinyIntVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer byteValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> ((TinyIntVector) arrowVector).set(i, (byte) ((LongColumnVector) 
hiveVector).vector[j]);
+
+  //short
+  private static final IntAndVectorsConsumer shortNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((SmallIntVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer shortValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> ((SmallIntVector) arrowVector).set(i, (short) ((LongColumnVector) 
hiveVector).vector[j]);
+
+  //int
+  private static final IntAndVectorsConsumer intNullSetter = (i, arrowVector, 
hiveVector)
+      -> ((IntVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer intValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> ((IntVector) arrowVector).set(i, (int) ((LongColumnVector) 
hiveVector).vector[j]);
+
+  //long
+  private static final IntAndVectorsConsumer longNullSetter = (i, arrowVector, 
hiveVector)
+      -> ((BigIntVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer longValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> ((BigIntVector) arrowVector).set(i, ((LongColumnVector) 
hiveVector).vector[j]);
+
+  //float
+  private static final IntAndVectorsConsumer floatNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((Float4Vector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer floatValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> ((Float4Vector) arrowVector).set(i, (float) ((DoubleColumnVector) 
hiveVector).vector[j]);
+
+  //double
+  private static final IntAndVectorsConsumer doubleNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((Float8Vector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer doubleValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> ((Float8Vector) arrowVector).set(i, ((DoubleColumnVector) 
hiveVector).vector[j]);
+
+  //string/varchar
+  private static final IntAndVectorsConsumer stringNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((VarCharVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer stringValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> {
+    BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+    ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j], 
bytesVector.start[j], bytesVector.length[j]);
+  };
+
+  //fixed-length CHAR
+  private static final IntAndVectorsConsumer charNullSetter = (i, arrowVector, 
hiveVector)
+      -> ((VarCharVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer charValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> {
+    BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+    VarCharVector varCharVector = (VarCharVector) arrowVector;
+    byte[] bytes = bytesVector.vector[j];
+    int length = bytesVector.length[j];
+    int start = bytesVector.start[j];
+
+    if (bytes == null) {
+      bytes = EMPTY_BYTES;
+      start = 0;
+      length = 0;
+    }
+
+    final CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo;
+    final int paddedLength = charTypeInfo.getLength();
+    final byte[] paddedBytes = StringExpr.padRight(bytes, start, length, 
paddedLength);
+    varCharVector.setSafe(i, paddedBytes, 0, paddedBytes.length);
+  };
+
+  //date
+  private static final IntAndVectorsConsumer dateNullSetter = (i, arrowVector, 
hiveVector)
+      -> ((DateDayVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer dateValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> ((DateDayVector) arrowVector).set(i, (int) ((LongColumnVector) 
hiveVector).vector[j]);
+
+  //timestamp
+  private static final IntAndVectorsConsumer timestampNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((TimeStampMicroTZVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer timestampValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> {
+    final TimeStampMicroTZVector timeStampMicroTZVector = 
(TimeStampMicroTZVector) arrowVector;
+    final TimestampColumnVector timestampColumnVector = 
(TimestampColumnVector) hiveVector;
+    // Time = second + sub-second
+    final long secondInMillis = timestampColumnVector.getTime(j);
+    final long secondInMicros = (secondInMillis - secondInMillis % 
MILLIS_PER_SECOND) * MICROS_PER_MILLIS;
+    final long subSecondInMicros = timestampColumnVector.getNanos(j) / 
NS_PER_MICROS;
+    if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && 
secondInMicros > 0)) {
+      // If the timestamp cannot be represented in long microsecond, set it as 
a null value
+      timeStampMicroTZVector.setNull(i);
+    } else {
+      timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros);
+    }
+  };
+
+  //binary
+  private static final IntAndVectorsConsumer binaryNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((VarBinaryVector) arrowVector).setNull(i);
+  private static final IntIntAndVectorsConsumer binaryValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> {
+    BytesColumnVector bytesVector = (BytesColumnVector) hiveVector;
+    ((VarBinaryVector) arrowVector).setSafe(i, bytesVector.vector[j], 
bytesVector.start[j], bytesVector.length[j]);
+  };
+
+  //decimal and decimal64
+  private static final IntAndVectorsConsumer decimalNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((DecimalVector) arrowVector).setNull(i);
+  private final IntIntAndVectorsConsumer decimalValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> {
+    final DecimalVector decimalVector = (DecimalVector) arrowVector;
+    final int scale = decimalVector.getScale();
+    decimalVector.set(i, ((DecimalColumnVector) 
hiveVector).vector[j].getHiveDecimal().bigDecimalValue().setScale(scale));
+
+    final HiveDecimalWritable writable = ((DecimalColumnVector) 
hiveVector).vector[i];
+    decimalHolder.precision = writable.precision();
+    decimalHolder.scale = scale;
+    try (ArrowBuf arrowBuf = allocator.buffer(DecimalHolder.WIDTH)) {
+      decimalHolder.buffer = arrowBuf;
+      final BigInteger bigInteger = new 
BigInteger(writable.getInternalStorage()).
+          multiply(BigInteger.TEN.pow(scale - writable.scale()));
+      decimalVector.set(i, new BigDecimal(bigInteger, scale));
+    }
+  };
+  private static final IntIntAndVectorsConsumer decimal64ValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> {
+    final DecimalVector decimalVector = (DecimalVector) arrowVector;
+    final int scale = decimalVector.getScale();
+    HiveDecimalWritable decimalHolder = new HiveDecimalWritable();
+    decimalHolder.setFromLongAndScale(((Decimal64ColumnVector) 
hiveVector).vector[j], scale);
+    decimalVector.set(i, 
decimalHolder.getHiveDecimal().bigDecimalValue().setScale(scale));
+  };
+
+  //interval year
+  private static final IntAndVectorsConsumer intervalYearMonthNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((IntervalYearVector) arrowVector).setNull(i);
+  private static IntIntAndVectorsConsumer intervalYearMonthValueSetter = (i, 
j, arrowVector, hiveVector, typeInfo)
+      -> ((IntervalYearVector) arrowVector).set(i, (int) ((LongColumnVector) 
hiveVector).vector[j]);
+
+  //interval day
+  private static final IntAndVectorsConsumer intervalDayTimeNullSetter = (i, 
arrowVector, hiveVector)
+      -> ((IntervalDayVector) arrowVector).setNull(i);
+  private static IntIntAndVectorsConsumer intervalDayTimeValueSetter = (i, j, 
arrowVector, hiveVector, typeInfo)
+      -> {
+    final IntervalDayVector intervalDayVector = (IntervalDayVector) 
arrowVector;
+    final IntervalDayTimeColumnVector intervalDayTimeColumnVector =
+        (IntervalDayTimeColumnVector) hiveVector;
+    long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(j);
+    final long days = totalSeconds / SECOND_PER_DAY;
+    final long millis =
+        (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND +
+            intervalDayTimeColumnVector.getNanos(j) / NS_PER_MILLIS;
+    intervalDayVector.set(i, (int) days, (int) millis);
+  };
+
+  //Used for setting null at arrowVector[i]
+  private interface IntAndVectorsConsumer {
+    void accept(int i, FieldVector arrowVector, ColumnVector hiveVector);
+  }
+
+  //Used to copy value from hiveVector[j] -> arrowVector[i]
+  //since hiveVector might be referenced through vector.selected
+  private interface IntIntAndVectorsConsumer {
+    void accept(int i, int j, FieldVector arrowVector, ColumnVector 
hiveVector, TypeInfo typeInfo);
+  }
+
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 1946cecc084..2dd12ef1918 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import 
org.apache.hadoop.hive.ql.exec.vector.filesink.VectorFileSinkArrowOperator;
 import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator;
 import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator;
 import 
org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyStringOperator;
@@ -4120,6 +4121,48 @@ public class Vectorizer implements PhysicalPlanResolver {
     return true;
   }
 
+  private boolean checkForArrowFileSink(FileSinkDesc fileSinkDesc,
+      boolean isTezOrSpark, VectorizationContext vContext,
+      VectorFileSinkDesc vectorDesc) throws HiveException {
+
+    // Various restrictions.
+
+    boolean isVectorizationFileSinkArrowNativeEnabled =
+        HiveConf.getBoolVar(hiveConf,
+            
HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED);
+
+    String engine = HiveConf.getVar(hiveConf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+
+    String serdeClassName = fileSinkDesc.getTableInfo().getSerdeClassName();
+
+    boolean isOkArrowFileSink =
+        
serdeClassName.equals("org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe")
 &&
+        isVectorizationFileSinkArrowNativeEnabled &&
+        engine.equalsIgnoreCase("tez");
+
+    return isOkArrowFileSink;
+  }
+
+  private Operator<? extends OperatorDesc> specializeArrowFileSinkOperator(
+      Operator<? extends OperatorDesc> op, VectorizationContext vContext, 
FileSinkDesc desc,
+      VectorFileSinkDesc vectorDesc) throws HiveException {
+
+    Class<? extends Operator<?>> opClass = VectorFileSinkArrowOperator.class;
+
+    Operator<? extends OperatorDesc> vectorOp = null;
+    try {
+      vectorOp = OperatorFactory.getVectorOperator(
+          opClass, op.getCompilationOpContext(), op.getConf(),
+          vContext, vectorDesc);
+    } catch (Exception e) {
+      LOG.info("Vectorizer vectorizeOperator file sink class exception " + 
opClass.getSimpleName() +
+          " exception " + e);
+      throw new HiveException(e);
+    }
+
+    return vectorOp;
+  }
+
   private boolean usesVectorUDFAdaptor(VectorExpression vecExpr) {
     if (vecExpr == null) {
       return false;
@@ -5130,9 +5173,20 @@ public class Vectorizer implements PhysicalPlanResolver {
             FileSinkDesc fileSinkDesc = (FileSinkDesc) op.getConf();
 
             VectorFileSinkDesc vectorFileSinkDesc = new VectorFileSinkDesc();
-            vectorOp = OperatorFactory.getVectorOperator(
-                op.getCompilationOpContext(), fileSinkDesc, vContext, 
vectorFileSinkDesc);
-            isNative = false;
+            boolean isArrowSpecialization =
+                checkForArrowFileSink(fileSinkDesc, isTezOrSpark, vContext, 
vectorFileSinkDesc);
+
+            if (isArrowSpecialization) {
+              vectorOp =
+                  specializeArrowFileSinkOperator(
+                      op, vContext, fileSinkDesc, vectorFileSinkDesc);
+              isNative = true;
+            } else {
+              vectorOp =
+                  OperatorFactory.getVectorOperator(
+                      op.getCompilationOpContext(), fileSinkDesc, vContext, 
vectorFileSinkDesc);
+              isNative = false;
+            }
           }
           break;
         case LIMIT:
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
index 10484ac9252..bf20aae5195 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.arrow;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.arrow.vector.VarCharVector;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
@@ -102,6 +103,7 @@ public class TestArrowColumnarBatchSerDe {
       {text(""), charW("", 10), varcharW("", 10)},
       {text("Hello"), charW("Hello", 10), varcharW("Hello", 10)},
       {text("world!"), charW("world!", 10), varcharW("world!", 10)},
+      {text("안녕?"), charW("안녕?", 10), varcharW("안녕?", 10)},
       {null, null, null},
   };
 
@@ -525,6 +527,31 @@ public class TestArrowColumnarBatchSerDe {
     initAndSerializeAndDeserialize(schema, DECIMAL_ROWS);
   }
 
+  @Test
+  public void testRandomPrimitiveDecimal() throws SerDeException {
+    String[][] schema = {
+        {"decimal1", "decimal(38,10)"},
+    };
+
+    int size = 1000;
+    Object[][] randomDecimals = new Object[size][];
+    Random random = new Random();
+    for (int i = 0; i < size; i++) {
+      StringBuilder builder = new StringBuilder();
+      builder.append(random.nextBoolean() ? '+' : '-');
+      for (int j = 0; j < 28 ; j++) {
+        builder.append(random.nextInt(10));
+      }
+      builder.append('.');
+      for (int j = 0; j < 10; j++) {
+        builder.append(random.nextInt(10));
+      }
+      randomDecimals[i] = new Object[] 
{decimalW(HiveDecimal.create(builder.toString()))};
+    }
+
+    initAndSerializeAndDeserialize(schema, randomDecimals);
+  }
+
   @Test
   public void testPrimitiveBoolean() throws SerDeException {
     String[][] schema = {
@@ -768,6 +795,32 @@ public class TestArrowColumnarBatchSerDe {
     initAndSerializeAndDeserialize(schema, toMap(BINARY_ROWS));
   }
 
+  @Test
+  public void testPrimitiveCharPadding() throws SerDeException {
+    String[][] schema = {
+        {"char1", "char(10)"},
+    };
+
+    HiveCharWritable[][] rows = new HiveCharWritable[][] {
+        {charW("Hello", 10)}, {charW("world!", 10)}};
+    ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe();
+    StructObjectInspector rowOI = initSerDe(serDe, schema);
+
+    ArrowWrapperWritable serialized = null;
+    for (Object[] row : rows) {
+      serialized = serDe.serialize(row, rowOI);
+    }
+    // Pass null to complete a batch
+    if (serialized == null) {
+      serialized = serDe.serialize(null, rowOI);
+    }
+
+    VarCharVector varCharVector = (VarCharVector) 
serialized.getVectorSchemaRoot().getFieldVectors().get(0);
+    for (int i = 0; i < rows.length; i++) {
+      assertEquals(rows[i][0].getPaddedValue().toString(), new 
String(varCharVector.get(i)));
+    }
+  }
+
   public void testMapDecimal() throws SerDeException {
     String[][] schema = {
         {"decimal_map", "map<string,decimal(38,10)>"},
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java
 
b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java
index 162e8e6353a..6ae2969171e 100644
--- 
a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java
+++ 
b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java
@@ -129,6 +129,21 @@ public class StringExpr {
     return charCount;
   }
 
+  public static byte[] padRight(byte[] bytes, int start, int length, int 
maxCharacterLength) {
+    final byte[] resultBytes;
+    final int characterLength = StringExpr.characterCount(bytes, start, 
length);
+    final int blankPadLength = Math.max(maxCharacterLength - characterLength, 
0);
+    final int resultLength = length + blankPadLength;
+    resultBytes = new byte[resultLength];
+    final int resultStart = 0;
+    System.arraycopy(bytes, start, resultBytes, resultStart, length);
+    final int padEnd = resultStart + resultLength;
+    for (int p = resultStart + length; p < padEnd; p++) {
+      resultBytes[p] = ' ';
+    }
+    return resultBytes;
+  }
+
   // A setVal with the same function signature as rightTrim, leftTrim, 
truncate, etc, below.
   // Useful for class generation via templates.
   public static void assign(BytesColumnVector outV, int i, byte[] bytes, int 
start, int length) {

Reply via email to