This is an automated email from the ASF dual-hosted git repository. sankarh pushed a commit to branch branch-3 in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push: new f6aa916a171 HIVE-27765: Backport of HIVE-20052, HIVE-20093, HIVE-20203, HIVE-20290, HIVE-20300, HIVE-20312, HIVE-20044, HIVE-21966 to branch-3(#4772) f6aa916a171 is described below commit f6aa916a17109d4f64e7dd878b49912b79dd8d75 Author: Aman Raj <104416558+amanraj2...@users.noreply.github.com> AuthorDate: Tue Oct 10 10:41:32 2023 +0530 HIVE-27765: Backport of HIVE-20052, HIVE-20093, HIVE-20203, HIVE-20290, HIVE-20300, HIVE-20312, HIVE-20044, HIVE-21966 to branch-3(#4772) * HIVE-20052: Arrow serde should fill ArrowColumnVector(Decimal) with the given schema precision/scale * HIVE-20093: LlapOutputFomatService: Use ArrowBuf with Netty for Accounting * HIVE-20203: Arrow SerDe leaks a DirectByteBuffer * HIVE-20290: Lazy initialize ArrowColumnarBatchSerDe so it doesn't allocate buffers during GetSplits * HIVE-20300: VectorFileSinkArrowOperator * HIVE-20312: Allow arrow clients to use their own BufferAllocator with LlapOutputFormatService * HIVE-20044: Arrow Serde should pad char values and handle empty strings correctly * HIVE-21966: Llap external client - Arrow Serializer throws ArrayIndexOutOfBoundsException in some cases --------- Co-authored-by: Eric Wohlstadter <ewohlstad...@hortonworks.com> Co-authored-by: Nikhil Gupta <gup...@microsoft.com> Co-authored-by: Teddy Choi <pudi...@gmail.com> Co-authored-by: Shubham Chaurasia <schaura...@cloudera.com> Signed-off-by: Sankar Hariappan <sank...@apache.org> Closes (#4772) --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 8 +- .../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 45 +- .../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 7 +- .../apache/hive/jdbc/TestJdbcWithMiniLlapRow.java | 6 +- ...w.java => TestJdbcWithMiniLlapVectorArrow.java} | 297 ++++--- .../hive/llap/LlapArrowBatchRecordReader.java | 15 +- .../hadoop/hive/llap/LlapArrowRowInputFormat.java | 14 +- .../hadoop/hive/llap/LlapBaseInputFormat.java | 25 +- .../hadoop/hive/llap/LlapArrowRecordWriter.java | 25 +- .../hive/llap/WritableByteChannelAdapter.java | 12 +- .../filesink/VectorFileSinkArrowOperator.java | 180 +++++ .../hive/ql/io/arrow/ArrowColumnarBatchSerDe.java | 20 +- .../hive/ql/io/arrow/ArrowWrapperWritable.java | 19 + .../apache/hadoop/hive/ql/io/arrow/Serializer.java | 865 +++++++++++++++------ .../hive/ql/optimizer/physical/Vectorizer.java | 60 +- .../ql/io/arrow/TestArrowColumnarBatchSerDe.java | 53 ++ .../ql/exec/vector/expressions/StringExpr.java | 15 + 17 files changed, 1268 insertions(+), 398 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 3ec99315a27..bf20a78b588 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2682,6 +2682,8 @@ public class HiveConf extends Configuration { // For Arrow SerDe HIVE_ARROW_ROOT_ALLOCATOR_LIMIT("hive.arrow.root.allocator.limit", Long.MAX_VALUE, "Arrow root allocator memory size limitation in bytes."), + HIVE_ARROW_BATCH_ALLOCATOR_LIMIT("hive.arrow.batch.allocator.limit", 10_000_000_000L, + "Max bytes per arrow batch. This is a threshold, the memory is not pre-allocated."), HIVE_ARROW_BATCH_SIZE("hive.arrow.batch.size", 1000, "The number of rows sent in one Arrow batch."), // For Druid storage handler @@ -3690,7 +3692,11 @@ public class HiveConf extends Configuration { "internal use only. When false, don't suppress fatal exceptions like\n" + "NullPointerException, etc so the query will fail and assure it will be noticed", true), - + HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED( + "hive.vectorized.execution.filesink.arrow.native.enabled", true, + "This flag should be set to true to enable the native vectorization\n" + + "of queries using the Arrow SerDe and FileSink.\n" + + "The default value is true."), HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true, "This property has been extended to control " + "whether to check, convert, and normalize partition value to conform to its column type in " + "partition operations including but not limited to insert, such as alter, describe etc."), diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java index 5cf765d8eb8..fbcd229d224 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java @@ -102,38 +102,20 @@ import org.apache.hadoop.mapred.InputFormat; * by sub-classes in a {@link org.junit.BeforeClass} initializer */ public abstract class BaseJdbcWithMiniLlap { - private static MiniHS2 miniHS2 = null; + private static String dataFileDir; private static Path kvDataFilePath; private static Path dataTypesFilePath; - private static HiveConf conf = null; - private static Connection hs2Conn = null; + protected static MiniHS2 miniHS2 = null; + protected static HiveConf conf = null; + protected static Connection hs2Conn = null; // This method should be called by sub-classes in a @BeforeClass initializer - public static MiniHS2 beforeTest(boolean useArrow) throws Exception { + public static MiniHS2 beforeTest(HiveConf inputConf) throws Exception { + conf = inputConf; Class.forName(MiniHS2.getJdbcDriverName()); - - String confDir = "../../data/conf/llap/"; - if (confDir != null && !confDir.isEmpty()) { - HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml")); - System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation()); - } - - conf = new HiveConf(); - conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); - conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); - if(useArrow) { - conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); - } else { - conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false); - } - - conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() - + "/tez-site.xml")); - miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP); - dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); kvDataFilePath = new Path(dataFileDir, "kv1.txt"); dataTypesFilePath = new Path(dataFileDir, "datatypes.txt"); @@ -143,6 +125,19 @@ public abstract class BaseJdbcWithMiniLlap { return miniHS2; } + static HiveConf defaultConf() throws Exception { + String confDir = "../../data/conf/llap/"; + if (confDir != null && !confDir.isEmpty()) { + HiveConf.setHiveSiteLocation(new URL("file://"+ new File(confDir).toURI().getPath() + "/hive-site.xml")); + System.out.println("Setting hive-site: " + HiveConf.getHiveSiteLocation()); + } + HiveConf defaultConf = new HiveConf(); + defaultConf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + defaultConf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); + defaultConf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); + return defaultConf; + } + @Before public void setUp() throws Exception { hs2Conn = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); @@ -549,6 +544,8 @@ public abstract class BaseJdbcWithMiniLlap { rowProcessor.process(row); ++rowCount; } + //In arrow-mode this will throw exception unless all buffers have been released + //See org.apache.hadoop.hive.llap.LlapArrowBatchRecordReader reader.close(); } LlapBaseInputFormat.close(handleId); diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java index 0c6acd8495c..3dcc4928b1a 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.io.NullWritable; import org.junit.BeforeClass; import org.junit.Ignore; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.AfterClass; import org.junit.Test; @@ -62,9 +64,10 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { @BeforeClass public static void beforeTest() throws Exception { - HiveConf conf = new HiveConf(); + HiveConf conf = defaultConf(); + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); MiniHS2.cleanupLocalDir(); - miniHS2 = BaseJdbcWithMiniLlap.beforeTest(true); + miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf); dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); Connection conDefault = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java index 809068fe3e7..d954d0e2fe6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapRow.java @@ -25,6 +25,8 @@ import org.junit.BeforeClass; import org.junit.Before; import org.junit.After; import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; /** * TestJdbcWithMiniLlap for llap Row format. @@ -33,7 +35,9 @@ public class TestJdbcWithMiniLlapRow extends BaseJdbcWithMiniLlap { @BeforeClass public static void beforeTest() throws Exception { - BaseJdbcWithMiniLlap.beforeTest(false); + HiveConf conf = defaultConf(); + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, false); + BaseJdbcWithMiniLlap.beforeTest(conf); } @Override diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java similarity index 54% copy from itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java copy to itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java index 0c6acd8495c..35eda6cb0a6 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapVectorArrow.java @@ -18,69 +18,42 @@ package org.apache.hive.jdbc; +import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; + import java.math.BigDecimal; + +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.Timestamp; + +import java.sql.Statement; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.llap.FieldDesc; import org.apache.hadoop.hive.llap.Row; import org.apache.hadoop.io.NullWritable; import org.junit.BeforeClass; -import org.junit.Ignore; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.junit.AfterClass; -import org.junit.Test; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Connection; -import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat; -import org.apache.hive.jdbc.miniHS2.MiniHS2; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.junit.Test; /** - * TestJdbcWithMiniLlap for Arrow format + * TestJdbcWithMiniLlap for Arrow format with vectorized output sink */ -@Ignore("unstable HIVE-23549") -public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { +public class TestJdbcWithMiniLlapVectorArrow extends BaseJdbcWithMiniLlap { - private static MiniHS2 miniHS2 = null; - private static final String tableName = "testJdbcMinihs2Tbl"; - private static String dataFileDir; - private static final String testDbName = "testJdbcMinihs2"; - - private static class ExceptionHolder { - Throwable throwable; - } @BeforeClass public static void beforeTest() throws Exception { - HiveConf conf = new HiveConf(); - MiniHS2.cleanupLocalDir(); - miniHS2 = BaseJdbcWithMiniLlap.beforeTest(true); - dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", ""); - - Connection conDefault = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(), - System.getProperty("user.name"), "bar"); - Statement stmt = conDefault.createStatement(); - stmt.execute("drop database if exists " + testDbName + " cascade"); - stmt.execute("create database " + testDbName); - stmt.close(); - conDefault.close(); - } - - @AfterClass - public static void afterTest() { - if (miniHS2 != null && miniHS2.isStarted()) { - miniHS2.stop(); - } + HiveConf conf = defaultConf(); + conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true); + conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED, true); + BaseJdbcWithMiniLlap.beforeTest(conf); } @Override @@ -266,94 +239,172 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap { assertArrayEquals("X'01FF'".getBytes("UTF-8"), (byte[]) rowValues[22]); } - /** - * SleepMsUDF - */ - public static class SleepMsUDF extends UDF { - public Integer evaluate(int value, int ms) { - try { - Thread.sleep(ms); - } catch (InterruptedException e) { - // No-op - } - return value; + + @Test + public void testTypesNestedInListWithLimitAndFilters() throws Exception { + try (Statement statement = hs2Conn.createStatement()) { + statement.execute("CREATE TABLE complex_tbl(c1 array<string>, " + + "c2 array<struct<f1:string,f2:string>>, " + + "c3 array<array<struct<f1:string,f2:string>>>, " + + "c4 int) STORED AS ORC"); + + statement.executeUpdate("INSERT INTO complex_tbl VALUES " + + "(" + + "ARRAY('a1', 'a2', 'a3', null), " + + "ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 'f2','a4')), " + + "ARRAY((ARRAY(NAMED_STRUCT('f1','a1', 'f2','a2'), NAMED_STRUCT('f1','a3', 'f2','a4')))), " + + "1), " + + "(" + + "ARRAY('b1'), " + + "ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 'f2','b4')), " + + "ARRAY((ARRAY(NAMED_STRUCT('f1','b1', 'f2','b2'), NAMED_STRUCT('f1','b3', 'f2','b4'))), " + + "(ARRAY(NAMED_STRUCT('f1','b5', 'f2','b6'), NAMED_STRUCT('f1','b7', 'f2','b8')))), " + + "2), " + + "(" + + "ARRAY('c1', 'c2'), ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), NAMED_STRUCT('f1','c3', 'f2','c4'), " + + "NAMED_STRUCT('f1','c5', 'f2','c6')), ARRAY((ARRAY(NAMED_STRUCT('f1','c1', 'f2','c2'), " + + "NAMED_STRUCT('f1','c3', 'f2','c4'))), (ARRAY(NAMED_STRUCT('f1','c5', 'f2','c6'), " + + "NAMED_STRUCT('f1','c7', 'f2','c8'))), (ARRAY(NAMED_STRUCT('f1','c9', 'f2','c10'), " + + "NAMED_STRUCT('f1','c11', 'f2','c12')))), " + + "3), " + + "(" + + "ARRAY(null), " + + "ARRAY(NAMED_STRUCT('f1','d1', 'f2','d2'), NAMED_STRUCT('f1','d3', 'f2','d4'), " + + "NAMED_STRUCT('f1','d5', 'f2','d6'), NAMED_STRUCT('f1','d7', 'f2','d8')), " + + "ARRAY((ARRAY(NAMED_STRUCT('f1','d1', 'f2', 'd2')))), " + + "4)"); + } + + List<Object[]> expected = new ArrayList<>(); + expected.add(new Object[]{ + asList("a1", "a2", "a3", null), + asList(asList("a1", "a2"), asList("a3", "a4")), + asList(asList(asList("a1", "a2"), asList("a3", "a4"))), + 1 + }); + expected.add(new Object[]{ + asList("b1"), + asList(asList("b1", "b2"), asList("b3", "b4")), + asList(asList(asList("b1", "b2"), asList("b3", "b4")), asList(asList("b5", "b6"), asList("b7", "b8"))), + 2 + }); + expected.add(new Object[]{ + asList("c1", "c2"), + asList(asList("c1", "c2"), asList("c3", "c4"), asList("c5", "c6")), + asList(asList(asList("c1", "c2"), asList("c3", "c4")), asList(asList("c5", "c6"), asList("c7", "c8")), + asList(asList("c9", "c10"), asList("c11", "c12"))), + 3 + }); + List<String> nullList = new ArrayList<>(); + nullList.add(null); + expected.add(new Object[]{ + nullList, + asList(asList("d1", "d2"), asList("d3", "d4"), asList("d5", "d6"), asList("d7", "d8")), + asList(asList(asList("d1", "d2"))), + 4 + }); + + // test without limit and filters (i.e VectorizedRowBatch#selectedInUse=false) + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from complex_tbl"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(0), + expected.get(1), + expected.get(2), + expected.get(3)); + + // test with filter + rowCollector = new RowCollector2(); + query = "select * from complex_tbl where c4 > 1 "; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(1), expected.get(2), expected.get(3)); + + // test with limit + rowCollector = new RowCollector2(); + query = "select * from complex_tbl limit 3"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(0), expected.get(1), expected.get(2)); + + // test with filters and limit + rowCollector = new RowCollector2(); + query = "select * from complex_tbl where c4 > 1 limit 2"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(1), expected.get(2)); + } - /** - * Test CLI kill command of a query that is running. - * We spawn 2 threads - one running the query and - * the other attempting to cancel. - * We're using a dummy udf to simulate a query, - * that runs for a sufficiently long time. - * @throws Exception - */ @Test - public void testKillQuery() throws Exception { - Connection con = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), - System.getProperty("user.name"), "bar"); - Connection con2 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName), - System.getProperty("user.name"), "bar"); - - String udfName = SleepMsUDF.class.getName(); - Statement stmt1 = con.createStatement(); - final Statement stmt2 = con2.createStatement(); - Path dataFilePath = new Path(dataFileDir, "kv1.txt"); - - String tblName = testDbName + "." + tableName; - - stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); - stmt1.execute("create table " + tblName + " (int_col int, value string) "); - stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' into table " + tblName); - - - stmt1.close(); - final Statement stmt = con.createStatement(); - final ExceptionHolder tExecuteHolder = new ExceptionHolder(); - final ExceptionHolder tKillHolder = new ExceptionHolder(); - - // Thread executing the query - Thread tExecute = new Thread(new Runnable() { - @Override - public void run() { - try { - System.out.println("Executing query: "); - stmt.execute("set hive.llap.execution.mode = none"); - - // The test table has 500 rows, so total query time should be ~ 500*500ms - stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " + - "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col"); - } catch (SQLException e) { - tExecuteHolder.throwable = e; - } - } + public void testTypesNestedInMapWithLimitAndFilters() throws Exception { + try (Statement statement = hs2Conn.createStatement()) { + statement.execute("CREATE TABLE complex_tbl2(c1 map<int, string>," + + " c2 map<int, array<string>>, " + + " c3 map<int, struct<f1:string,f2:string>>, c4 int) STORED AS ORC"); + + statement.executeUpdate("INSERT INTO complex_tbl2 VALUES " + + "(MAP(1, 'a1'), MAP(1, ARRAY('a1', 'a2')), MAP(1, NAMED_STRUCT('f1','a1', 'f2','a2')), " + + "1), " + + "(MAP(1, 'b1',2, 'b2'), MAP(1, ARRAY('b1', 'b2'), 2, ARRAY('b3') ), " + + "MAP(1, NAMED_STRUCT('f1','b1', 'f2','b2')), " + + "2), " + + "(MAP(1, 'c1',2, 'c2'), MAP(1, ARRAY('c1', 'c2'), 2, ARRAY('c3') ), " + + "MAP(1, NAMED_STRUCT('f1','c1', 'f2','c2'), 2, NAMED_STRUCT('f1', 'c3', 'f2', 'c4') ), " + + "3)"); + + } + + List<Object[]> expected = new ArrayList<>(); + expected.add(new Object[]{ + ImmutableMap.of(1, "a1"), + ImmutableMap.of(1, asList("a1", "a2")), + ImmutableMap.of(1, asList("a1", "a2")), + 1, }); - // Thread killing the query - Thread tKill = new Thread(new Runnable() { - @Override - public void run() { - try { - Thread.sleep(5000); - String queryId = ((HiveStatement) stmt).getQueryId(); - System.out.println("Killing query: " + queryId); - stmt2.execute("kill query '" + queryId + "'"); - stmt2.close(); - } catch (Exception e) { - tKillHolder.throwable = e; - } - } + expected.add(new Object[]{ + ImmutableMap.of(1, "b1", 2, "b2"), + ImmutableMap.of(1, asList("b1", "b2"), 2, asList("b3")), + ImmutableMap.of(1, asList("b1", "b2")), + 2, + }); + expected.add(new Object[]{ + ImmutableMap.of(1, "c1", 2, "c2"), + ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3")), + ImmutableMap.of(1, asList("c1", "c2"), 2, asList("c3", "c4")), + 3, }); - tExecute.start(); - tKill.start(); - tExecute.join(); - tKill.join(); - stmt.close(); - con2.close(); - con.close(); - assertNotNull("tExecute", tExecuteHolder.throwable); - assertNull("tCancel", tKillHolder.throwable); + // test without limit and filters (i.e. VectorizedRowBatch#selectedInUse=false) + RowCollector2 rowCollector = new RowCollector2(); + String query = "select * from complex_tbl2"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(0), expected.get(1), expected.get(2)); + + // test with filter + rowCollector = new RowCollector2(); + query = "select * from complex_tbl2 where c4 > 1 "; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(1), expected.get(2)); + + // test with limit + rowCollector = new RowCollector2(); + query = "select * from complex_tbl2 limit 2"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(0), expected.get(1)); + + // test with filters and limit + rowCollector = new RowCollector2(); + query = "select * from complex_tbl2 where c4 > 1 limit 1"; + processQuery(query, 1, rowCollector); + verifyResult(rowCollector.rows, expected.get(1)); + + } + + private void verifyResult(List<Object[]> actual, Object[]... expected) { + assertEquals(expected.length, actual.size()); + for (int i = 0; i < expected.length; i++) { + assertArrayEquals(expected[i], actual.get(i)); + } } } diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java index d9c5666bc40..014e49dafef 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java @@ -39,11 +39,19 @@ public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrappe private BufferAllocator allocator; private ArrowStreamReader arrowStreamReader; + //Allows client to provide and manage their own arrow BufferAllocator public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz, - JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { + JobConf job, Closeable client, Socket socket, BufferAllocator allocator) throws IOException { super(in, schema, clazz, job, client, socket); - allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit); + this.allocator = allocator; this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator); + } + + //Use the global arrow BufferAllocator + public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz, + JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { + this(in, schema, clazz, job, client, socket, + RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit)); } @Override @@ -76,6 +84,9 @@ public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrappe @Override public void close() throws IOException { arrowStreamReader.close(); + //allocator.close() will throw exception unless all buffers have been released + //See org.apache.arrow.memory.BaseAllocator.close() + allocator.close(); } } diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java index fafbdee210a..7690599a80c 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java @@ -25,16 +25,28 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import java.io.IOException; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; +import java.util.UUID; /* * Adapts an Arrow batch reader to a row reader + * Only used for testing */ public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> { private LlapBaseInputFormat baseInputFormat; public LlapArrowRowInputFormat(long arrowAllocatorLimit) { - baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit); + BufferAllocator allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator( + //allocator name, use UUID for testing + UUID.randomUUID().toString(), + //No use for reservation, allocators claim memory from the same pool, + //but allocate/releases are tracked per-allocator + 0, + //Limit passed in by client + arrowAllocatorLimit); + baseInputFormat = new LlapBaseInputFormat(true, allocator); } @Override diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 30f372003f0..46e6e024ba2 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.arrow.memory.BufferAllocator; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; @@ -107,6 +108,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> private String query; private boolean useArrow; private long arrowAllocatorLimit; + private BufferAllocator allocator; private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; @@ -128,11 +130,17 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> this.query = query; } + //Exposed only for testing, clients should use LlapBaseInputFormat(boolean, BufferAllocator instead) public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) { this.useArrow = useArrow; this.arrowAllocatorLimit = arrowAllocatorLimit; } + public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) { + this.useArrow = useArrow; + this.allocator = allocator; + } + public LlapBaseInputFormat() { this.useArrow = false; } @@ -209,10 +217,19 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> @SuppressWarnings("rawtypes") LlapBaseRecordReader recordReader; if(useArrow) { - recordReader = new LlapArrowBatchRecordReader( - socket.getInputStream(), llapSplit.getSchema(), - ArrowWrapperWritable.class, job, llapClient, socket, - arrowAllocatorLimit); + if(allocator != null) { + //Client provided their own allocator + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + allocator); + } else { + //Client did not provide their own allocator, use constructor for global allocator + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + arrowAllocatorLimit); + } } else { recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket); diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java index 1b3a3ebb269..4cd8a61c8f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapArrowRecordWriter.java @@ -20,11 +20,12 @@ package org.apache.hadoop.hive.llap; import java.io.IOException; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.NonNullableStructVector; import org.apache.arrow.vector.ipc.ArrowStreamWriter; import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; import org.apache.hadoop.io.Writable; -import java.nio.channels.WritableByteChannel; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; import org.slf4j.Logger; @@ -47,15 +48,28 @@ public class LlapArrowRecordWriter<K extends Writable, V extends Writable> public static final Logger LOG = LoggerFactory.getLogger(LlapArrowRecordWriter.class); ArrowStreamWriter arrowStreamWriter; - WritableByteChannel out; + WritableByteChannelAdapter out; + BufferAllocator allocator; + NonNullableStructVector rootVector; - public LlapArrowRecordWriter(WritableByteChannel out) { + public LlapArrowRecordWriter(WritableByteChannelAdapter out) { this.out = out; } @Override public void close(Reporter reporter) throws IOException { - arrowStreamWriter.close(); + try { + arrowStreamWriter.close(); + } finally { + rootVector.close(); + //bytesLeaked should always be 0 + long bytesLeaked = allocator.getAllocatedMemory(); + if(bytesLeaked != 0) { + LOG.error("Arrow memory leaked bytes: {}", bytesLeaked); + throw new IllegalStateException("Arrow memory leaked bytes:" + bytesLeaked); + } + allocator.close(); + } } @Override @@ -64,6 +78,9 @@ public class LlapArrowRecordWriter<K extends Writable, V extends Writable> if (arrowStreamWriter == null) { VectorSchemaRoot vectorSchemaRoot = arrowWrapperWritable.getVectorSchemaRoot(); arrowStreamWriter = new ArrowStreamWriter(vectorSchemaRoot, null, out); + allocator = arrowWrapperWritable.getAllocator(); + this.out.setAllocator(allocator); + rootVector = arrowWrapperWritable.getRootVector(); } arrowStreamWriter.writeBatch(); } diff --git a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java index 57da1d9f6d8..b07ce5b07c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java +++ b/ql/src/java/org/apache/hadoop/hive/llap/WritableByteChannelAdapter.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hive.llap; -import io.netty.buffer.Unpooled; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.util.concurrent.Semaphore; +import org.apache.arrow.memory.BufferAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,7 @@ public class WritableByteChannelAdapter implements WritableByteChannel { private final Semaphore writeResources; private boolean closed = false; private final String id; + private BufferAllocator allocator; private ChannelFutureListener writeListener = new ChannelFutureListener() { @Override @@ -82,12 +84,18 @@ public class WritableByteChannelAdapter implements WritableByteChannel { this.id = id; } + public void setAllocator(BufferAllocator allocator) { + this.allocator = allocator; + } + @Override public int write(ByteBuffer src) throws IOException { int size = src.remaining(); //Down the semaphore or block until available takeWriteResources(1); - chc.writeAndFlush(Unpooled.wrappedBuffer(src)).addListener(writeListener); + ByteBuf buf = allocator.buffer(size); + buf.writeBytes(src); + chc.writeAndFlush(buf).addListener(writeListener); return size; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java new file mode 100644 index 00000000000..1603703ec7e --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/filesink/VectorFileSinkArrowOperator.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.filesink; + +import java.io.Serializable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.io.arrow.ArrowWrapperWritable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.hive.llap.LlapOutputFormatService; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import java.util.List; +import java.util.ArrayList; +import org.apache.hadoop.hive.ql.io.arrow.Serializer; +import static org.apache.hadoop.hive.llap.LlapOutputFormat.LLAP_OF_ID_KEY; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.core.layout.AbstractStringLayout; + +/** + * Native Vectorized File Sink operator implementation for Arrow. + * Assumes output to LlapOutputFormatService + **/ +public class VectorFileSinkArrowOperator extends TerminalOperator<FileSinkDesc> + implements Serializable, VectorizationOperator { + + private static final long serialVersionUID = 1L; + + private VectorizationContext vContext; + private VectorFileSinkDesc vectorDesc; + public static final Logger LOG = LoggerFactory.getLogger(VectorFileSinkArrowOperator.class.getName()); + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + private transient Serializer converter; + private transient RecordWriter recordWriter; + private transient boolean wroteData; + private transient String attemptId; + + public VectorFileSinkArrowOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) { + this(ctx); + this.conf = (FileSinkDesc) conf; + this.vContext = vContext; + this.vectorDesc = (VectorFileSinkDesc) vectorDesc; + } + + /** Kryo ctor. */ + @VisibleForTesting + public VectorFileSinkArrowOperator() { + super(); + } + + public VectorFileSinkArrowOperator(CompilationOpContext ctx) { + super(ctx); + } + + @Override + public VectorizationContext getInputVectorizationContext() { + return vContext; + } + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + //attemptId identifies a RecordWriter initialized by LlapOutputFormatService + this.attemptId = hconf.get(LLAP_OF_ID_KEY); + try { + //Initialize column names and types + List<TypeInfo> typeInfos = new ArrayList<>(); + List<String> fieldNames = new ArrayList<>(); + StructObjectInspector schema = (StructObjectInspector) inputObjInspectors[0]; + for(int i = 0; i < schema.getAllStructFieldRefs().size(); i++) { + StructField structField = schema.getAllStructFieldRefs().get(i); + fieldNames.add(structField.getFieldName()); + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(structField.getFieldObjectInspector()); + typeInfos.add(typeInfo); + } + //Initialize an Arrow serializer + converter = new Serializer(hconf, attemptId, typeInfos, fieldNames); + } catch (Exception e) { + LOG.error("Unable to initialize VectorFileSinkArrowOperator"); + throw new RuntimeException(e); + } + } + + @Override + public void process(Object data, int tag) throws HiveException { + //ArrowStreamReader expects at least the schema metadata, if this op writes no data, + //we need to send the schema to close the stream gracefully + VectorizedRowBatch batch = (VectorizedRowBatch) data; + try { + if(recordWriter == null) { + recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId); + } + //Convert the VectorizedRowBatch to a handle for the Arrow batch + ArrowWrapperWritable writable = converter.serializeBatch(batch, true); + //Pass the handle to the LlapOutputFormatService recordWriter + recordWriter.write(null, writable); + this.wroteData = true; + } catch(Exception e) { + LOG.error("Failed to convert VectorizedRowBatch to Arrow batch"); + throw new RuntimeException(e); + } + } + + @Override + protected void closeOp(boolean abort) throws HiveException { + try { + if(!wroteData) { + //Send a schema only batch to signal EOS with no data written + ArrowWrapperWritable writable = converter.emptyBatch(); + if(recordWriter == null) { + recordWriter = LlapOutputFormatService.get().getWriter(this.attemptId); + } + recordWriter.write(null, writable); + } + } catch(Exception e) { + LOG.error("Failed to write Arrow stream schema"); + throw new RuntimeException(e); + } finally { + try { + //Close the recordWriter with null Reporter + recordWriter.close(null); + } catch(Exception e) { + LOG.error("Failed to close Arrow stream"); + throw new RuntimeException(e); + } + } + } + + @Override + public VectorDesc getVectorDesc() { + return vectorDesc; + } + + @Override + public OperatorType getType() { + return OperatorType.FILESINK; + } +} + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java index a1a3327814c..dac7d9c18a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowColumnarBatchSerDe.java @@ -104,7 +104,6 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe { public void initialize(Configuration conf, Properties tbl) throws SerDeException { this.conf = conf; - rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf); final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS); final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); @@ -134,8 +133,6 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe { fields.add(toField(columnNames.get(i), columnTypes.get(i))); } - serializer = new Serializer(this); - deserializer = new Deserializer(this); } private static Field toField(String name, TypeInfo typeInfo) { @@ -257,6 +254,15 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe { @Override public ArrowWrapperWritable serialize(Object obj, ObjectInspector objInspector) { + if(serializer == null) { + try { + rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf); + serializer = new Serializer(this); + } catch(Exception e) { + LOG.error("Unable to initialize serializer for ArrowColumnarBatchSerDe"); + throw new RuntimeException(e); + } + } return serializer.serialize(obj, objInspector); } @@ -267,6 +273,14 @@ public class ArrowColumnarBatchSerDe extends AbstractSerDe { @Override public Object deserialize(Writable writable) { + if(deserializer == null) { + try { + rootAllocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf); + deserializer = new Deserializer(this); + } catch(Exception e) { + throw new RuntimeException(e); + } + } return deserializer.deserialize(writable); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java index dd490b1b909..53bee6b823f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/ArrowWrapperWritable.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.io.arrow; +import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.hadoop.io.WritableComparable; +import org.apache.arrow.vector.complex.NonNullableStructVector; import java.io.DataInput; import java.io.DataOutput; @@ -27,10 +29,19 @@ import java.io.IOException; public class ArrowWrapperWritable implements WritableComparable { private VectorSchemaRoot vectorSchemaRoot; + private BufferAllocator allocator; + private NonNullableStructVector rootVector; public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot) { this.vectorSchemaRoot = vectorSchemaRoot; } + + public ArrowWrapperWritable(VectorSchemaRoot vectorSchemaRoot, BufferAllocator allocator, NonNullableStructVector rootVector) { + this.vectorSchemaRoot = vectorSchemaRoot; + this.allocator = allocator; + this.rootVector = rootVector; + } + public ArrowWrapperWritable() {} public VectorSchemaRoot getVectorSchemaRoot() { @@ -41,6 +52,14 @@ public class ArrowWrapperWritable implements WritableComparable { this.vectorSchemaRoot = vectorSchemaRoot; } + public BufferAllocator getAllocator() { + return allocator; + } + + public NonNullableStructVector getRootVector() { + return rootVector; + } + @Override public void write(DataOutput dataOutput) throws IOException { throw new UnsupportedOperationException(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 7a432ac836b..5289b7c1efd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -38,30 +38,37 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.NonNullableStructVector; import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.holders.DecimalHolder; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.IntervalDayTimeColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; @@ -69,11 +76,15 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.arrow.memory.BufferAllocator; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_ALLOCATOR_LIMIT; import static org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil.createColumnVector; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MICROS_PER_MILLIS; import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.MILLIS_PER_SECOND; @@ -85,31 +96,61 @@ import static org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe.toStruc import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption.WRITABLE; import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfoFromObjectInspector; -class Serializer { +public class Serializer { private final int MAX_BUFFERED_ROWS; - - // Schema - private final StructTypeInfo structTypeInfo; - private final int fieldSize; + private final static byte[] EMPTY_BYTES = new byte[0]; // Hive columns private final VectorizedRowBatch vectorizedRowBatch; private final VectorAssignRow vectorAssignRow; private int batchSize; + private BufferAllocator allocator; + private List<TypeInfo> fieldTypeInfos; + private List<String> fieldNames; + private int fieldSize; private final StructVector rootVector; + private final DecimalHolder decimalHolder = new DecimalHolder(); + + //Constructor for non-serde serialization + public Serializer(Configuration conf, String attemptId, List<TypeInfo> typeInfos, List<String> fieldNames) { + this.fieldTypeInfos = typeInfos; + this.fieldNames = fieldNames; + long childAllocatorLimit = HiveConf.getLongVar(conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); + //Use per-task allocator for accounting only, no need to reserve per-task memory + long childAllocatorReservation = 0L; + //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed + allocator = RootAllocatorFactory.INSTANCE.getRootAllocator(conf).newChildAllocator( + attemptId, + childAllocatorReservation, + childAllocatorLimit); + rootVector = StructVector.empty(null, allocator); + //These last fields are unused in non-serde usage + vectorizedRowBatch = null; + vectorAssignRow = null; + MAX_BUFFERED_ROWS = 0; + } Serializer(ArrowColumnarBatchSerDe serDe) throws SerDeException { MAX_BUFFERED_ROWS = HiveConf.getIntVar(serDe.conf, HIVE_ARROW_BATCH_SIZE); + long childAllocatorLimit = HiveConf.getLongVar(serDe.conf, HIVE_ARROW_BATCH_ALLOCATOR_LIMIT); ArrowColumnarBatchSerDe.LOG.info("ArrowColumnarBatchSerDe max number of buffered columns: " + MAX_BUFFERED_ROWS); + String childAllocatorName = Thread.currentThread().getName(); + //Use per-task allocator for accounting only, no need to reserve per-task memory + long childAllocatorReservation = 0L; + //Break out accounting of direct memory per-task, so we can check no memory is leaked when task is completed + allocator = serDe.rootAllocator.newChildAllocator( + childAllocatorName, + childAllocatorReservation, + childAllocatorLimit); // Schema - structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector); - List<TypeInfo> fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + StructTypeInfo structTypeInfo = (StructTypeInfo) getTypeInfoFromObjectInspector(serDe.rowObjectInspector); + fieldTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + fieldNames = structTypeInfo.getAllStructFieldNames(); fieldSize = fieldTypeInfos.size(); - // Init Arrow stuffs - rootVector = StructVector.empty(null, serDe.rootAllocator); + rootVector = StructVector.empty(null, allocator); // Init Hive stuffs vectorizedRowBatch = new VectorizedRowBatch(fieldSize); @@ -127,33 +168,66 @@ class Serializer { } } - private ArrowWrapperWritable serializeBatch() { + //Construct an emptyBatch which contains schema-only info + public ArrowWrapperWritable emptyBatch() { + rootVector.setValueCount(0); + for (int fieldIndex = 0; fieldIndex < fieldTypeInfos.size(); fieldIndex++) { + final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex); + final String fieldName = fieldNames.get(fieldIndex); + final FieldType fieldType = toFieldType(fieldTypeInfo); + final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class); + arrowVector.setInitialCapacity(0); + arrowVector.allocateNew(); + } + VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector); + return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector); + } + + //Used for both: + //1. VectorizedRowBatch constructed by batching rows + //2. VectorizedRowBatch provided from upstream (isNative) + public ArrowWrapperWritable serializeBatch(VectorizedRowBatch vectorizedRowBatch, boolean isNative) { rootVector.setValueCount(0); for (int fieldIndex = 0; fieldIndex < vectorizedRowBatch.projectionSize; fieldIndex++) { final int projectedColumn = vectorizedRowBatch.projectedColumns[fieldIndex]; final ColumnVector hiveVector = vectorizedRowBatch.cols[projectedColumn]; - final TypeInfo fieldTypeInfo = structTypeInfo.getAllStructFieldTypeInfos().get(fieldIndex); - final String fieldName = structTypeInfo.getAllStructFieldNames().get(fieldIndex); + final TypeInfo fieldTypeInfo = fieldTypeInfos.get(fieldIndex); + final String fieldName = fieldNames.get(fieldIndex); final FieldType fieldType = toFieldType(fieldTypeInfo); + //Reuse existing FieldVector buffers + //since we always call setValue or setNull for each row + boolean fieldExists = false; + if(rootVector.getChild(fieldName) != null) { + fieldExists = true; + } final FieldVector arrowVector = rootVector.addOrGet(fieldName, fieldType, FieldVector.class); - arrowVector.setInitialCapacity(batchSize); - arrowVector.allocateNew(); - write(arrowVector, hiveVector, fieldTypeInfo, batchSize); + if(fieldExists) { + arrowVector.setValueCount(isNative ? vectorizedRowBatch.size : batchSize); + } else { + arrowVector.setInitialCapacity(isNative ? vectorizedRowBatch.size : batchSize); + arrowVector.allocateNew(); + } + write(arrowVector, hiveVector, fieldTypeInfo, isNative ? vectorizedRowBatch.size : batchSize, vectorizedRowBatch, isNative); + } + if(!isNative) { + //Only mutate batches that are constructed by this serde + vectorizedRowBatch.reset(); + rootVector.setValueCount(batchSize); + } else { + rootVector.setValueCount(vectorizedRowBatch.size); } - vectorizedRowBatch.reset(); - rootVector.setValueCount(batchSize); batchSize = 0; VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(rootVector); - return new ArrowWrapperWritable(vectorSchemaRoot); + return new ArrowWrapperWritable(vectorSchemaRoot, allocator, rootVector); } - private FieldType toFieldType(TypeInfo typeInfo) { + private static FieldType toFieldType(TypeInfo typeInfo) { return new FieldType(true, toArrowType(typeInfo), null); } - private ArrowType toArrowType(TypeInfo typeInfo) { + private static ArrowType toArrowType(TypeInfo typeInfo) { switch (typeInfo.getCategory()) { case PRIMITIVE: switch (((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory()) { @@ -207,38 +281,43 @@ class Serializer { } } - private void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size) { + private void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, + VectorizedRowBatch vectorizedRowBatch, boolean isNative) { switch (typeInfo.getCategory()) { case PRIMITIVE: - writePrimitive(arrowVector, hiveVector, typeInfo, size); + writePrimitive(arrowVector, hiveVector, typeInfo, size, vectorizedRowBatch, isNative); break; case LIST: - writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size); + writeList((ListVector) arrowVector, (ListColumnVector) hiveVector, (ListTypeInfo) typeInfo, size, vectorizedRowBatch, isNative); break; case STRUCT: - writeStruct((NonNullableStructVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size); + writeStruct((NonNullableStructVector) arrowVector, (StructColumnVector) hiveVector, (StructTypeInfo) typeInfo, size, vectorizedRowBatch, isNative); break; case UNION: - writeUnion(arrowVector, hiveVector, typeInfo, size); + writeUnion(arrowVector, hiveVector, typeInfo, size, vectorizedRowBatch, isNative); break; case MAP: - writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size); + writeMap((ListVector) arrowVector, (MapColumnVector) hiveVector, (MapTypeInfo) typeInfo, size, vectorizedRowBatch, isNative); break; default: throw new IllegalArgumentException(); - } + } } private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo, - int size) { + int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo); final ListColumnVector structListVector = toStructListVector(hiveVector); - write(arrowVector, structListVector, structListTypeInfo, size); + write(arrowVector, structListVector, structListTypeInfo, size, vectorizedRowBatch, isNative); final ArrowBuf validityBuffer = arrowVector.getValidityBuffer(); for (int rowIndex = 0; rowIndex < size; rowIndex++) { - if (hiveVector.isNull[rowIndex]) { + int selectedIndex = rowIndex; + if (vectorizedRowBatch.selectedInUse) { + selectedIndex = vectorizedRowBatch.selected[rowIndex]; + } + if (hiveVector.isNull[selectedIndex]) { BitVectorHelper.setValidityBit(validityBuffer, rowIndex, 0); } else { BitVectorHelper.setValidityBitToOne(validityBuffer, rowIndex); @@ -247,7 +326,7 @@ class Serializer { } private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, - int size) { + int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos(); final UnionColumnVector hiveUnionVector = (UnionColumnVector) hiveVector; @@ -257,11 +336,11 @@ class Serializer { final ColumnVector hiveObjectVector = hiveObjectVectors[tag]; final TypeInfo objectTypeInfo = objectTypeInfos.get(tag); - write(arrowVector, hiveObjectVector, objectTypeInfo, size); + write(arrowVector, hiveObjectVector, objectTypeInfo, size, vectorizedRowBatch, isNative); } private void writeStruct(NonNullableStructVector arrowVector, StructColumnVector hiveVector, - StructTypeInfo typeInfo, int size) { + StructTypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final List<String> fieldNames = typeInfo.getAllStructFieldNames(); final List<TypeInfo> fieldTypeInfos = typeInfo.getAllStructFieldTypeInfos(); final ColumnVector[] hiveFieldVectors = hiveVector.fields; @@ -291,7 +370,7 @@ class Serializer { toFieldType(fieldTypeInfos.get(fieldIndex)), FieldVector.class); arrowFieldVector.setInitialCapacity(size); arrowFieldVector.allocateNew(); - write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size); + write(arrowFieldVector, hiveFieldVector, fieldTypeInfo, size, vectorizedRowBatch, isNative); } final ArrowBuf validityBuffer = arrowVector.getValidityBuffer(); @@ -304,235 +383,332 @@ class Serializer { } } - private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, - int size) { + // selected[] points to the valid/filtered/selected records at row level. + // for MultiValuedColumnVector such as ListColumnVector one record of vector points to multiple nested records. + // In child vectors we get these records in exploded manner i.e. the number of records in child vectors can have size more + // than actual the VectorizedRowBatch, consequently selected[] also needs to be readjusted. + // This method creates a shallow copy of VectorizedRowBatch with corrected size and selected[] + + private static VectorizedRowBatch correctSelectedAndSize(VectorizedRowBatch sourceVrb, + MultiValuedColumnVector multiValuedColumnVector) { + + VectorizedRowBatch vrb = new VectorizedRowBatch(sourceVrb.numCols, sourceVrb.size); + vrb.cols = sourceVrb.cols; + vrb.endOfFile = sourceVrb.endOfFile; + vrb.projectedColumns = sourceVrb.projectedColumns; + vrb.projectionSize = sourceVrb.projectionSize; + vrb.selectedInUse = sourceVrb.selectedInUse; + vrb.setPartitionInfo(sourceVrb.getDataColumnCount(), sourceVrb.getPartitionColumnCount()); + + int correctedSize = 0; + final int[] srcVrbSelected = sourceVrb.selected; + for (int i = 0; i < sourceVrb.size; i++) { + correctedSize += multiValuedColumnVector.lengths[srcVrbSelected[i]]; + } + + int newIndex = 0; + final int[] selectedOffsetsCorrected = new int[correctedSize]; + for (int i = 0; i < sourceVrb.size; i++) { + long elementIndex = multiValuedColumnVector.offsets[srcVrbSelected[i]]; + long elementSize = multiValuedColumnVector.lengths[srcVrbSelected[i]]; + for (int j = 0; j < elementSize; j++) { + selectedOffsetsCorrected[newIndex++] = (int) (elementIndex + j); + } + } + vrb.selected = selectedOffsetsCorrected; + vrb.size = correctedSize; + return vrb; + } + + private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size, + VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final int OFFSET_WIDTH = 4; final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo(); final ColumnVector hiveElementVector = hiveVector.child; final FieldVector arrowElementVector = - (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector(); - arrowElementVector.setInitialCapacity(hiveVector.childCount); + (FieldVector) arrowVector.addOrGetVector(toFieldType(elementTypeInfo)).getVector(); + + VectorizedRowBatch correctedVrb = vectorizedRowBatch; + int correctedSize = hiveVector.childCount; + if (vectorizedRowBatch.selectedInUse) { + correctedVrb = correctSelectedAndSize(vectorizedRowBatch, hiveVector); + correctedSize = correctedVrb.size; + } + arrowElementVector.setInitialCapacity(correctedSize); arrowElementVector.allocateNew(); - write(arrowElementVector, hiveElementVector, elementTypeInfo, hiveVector.childCount); + write(arrowElementVector, hiveElementVector, elementTypeInfo, correctedSize, correctedVrb, isNative); final ArrowBuf offsetBuffer = arrowVector.getOffsetBuffer(); int nextOffset = 0; for (int rowIndex = 0; rowIndex < size; rowIndex++) { - if (hiveVector.isNull[rowIndex]) { + int selectedIndex = rowIndex; + if (vectorizedRowBatch.selectedInUse) { + selectedIndex = vectorizedRowBatch.selected[rowIndex]; + } + if (hiveVector.isNull[selectedIndex]) { offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset); } else { offsetBuffer.setInt(rowIndex * OFFSET_WIDTH, nextOffset); - nextOffset += (int) hiveVector.lengths[rowIndex]; + nextOffset += (int) hiveVector.lengths[selectedIndex]; arrowVector.setNotNull(rowIndex); } } offsetBuffer.setInt(size * OFFSET_WIDTH, nextOffset); } - private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, - int size) { + //Handle cases for both internally constructed + //and externally provided (isNative) VectorRowBatch + private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, + VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); switch (primitiveCategory) { - case BOOLEAN: - { - final BitVector bitVector = (BitVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - bitVector.setNull(i); - } else { - bitVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]); - } - } + case BOOLEAN: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, boolNullSetter, boolValueSetter, typeInfo); + return; + } + final BitVector bitVector = (BitVector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + boolNullSetter.accept(i, arrowVector, hiveVector); + } else { + boolValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case BYTE: - { - final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - tinyIntVector.setNull(i); - } else { - tinyIntVector.set(i, (byte) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case BYTE: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, byteNullSetter, byteValueSetter, typeInfo); + return; + } + final TinyIntVector tinyIntVector = (TinyIntVector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + byteNullSetter.accept(i, arrowVector, hiveVector); + } else { + byteValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case SHORT: - { - final SmallIntVector smallIntVector = (SmallIntVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - smallIntVector.setNull(i); - } else { - smallIntVector.set(i, (short) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case SHORT: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, shortNullSetter, shortValueSetter, typeInfo); + return; + } + final SmallIntVector smallIntVector = (SmallIntVector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + shortNullSetter.accept(i, arrowVector, hiveVector); + } else { + shortValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case INT: - { - final IntVector intVector = (IntVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - intVector.setNull(i); - } else { - intVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case INT: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intNullSetter, intValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + intNullSetter.accept(i, arrowVector, hiveVector); + } else { + intValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case LONG: - { - final BigIntVector bigIntVector = (BigIntVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - bigIntVector.setNull(i); - } else { - bigIntVector.set(i, ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case LONG: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, longNullSetter, longValueSetter, typeInfo); + return; + } + final BigIntVector bigIntVector = (BigIntVector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + longNullSetter.accept(i, arrowVector, hiveVector); + } else { + longValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case FLOAT: - { - final Float4Vector float4Vector = (Float4Vector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - float4Vector.setNull(i); - } else { - float4Vector.set(i, (float) ((DoubleColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case FLOAT: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, floatNullSetter, floatValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + floatNullSetter.accept(i, arrowVector, hiveVector); + } else { + floatValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case DOUBLE: - { - final Float8Vector float8Vector = (Float8Vector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - float8Vector.setNull(i); - } else { - float8Vector.set(i, ((DoubleColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case DOUBLE: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, doubleNullSetter, doubleValueSetter, typeInfo); + return; + } + final Float8Vector float8Vector = (Float8Vector) arrowVector; + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + doubleNullSetter.accept(i, arrowVector, hiveVector); + } else { + doubleValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case STRING: - case VARCHAR: - case CHAR: - { - final VarCharVector varCharVector = (VarCharVector) arrowVector; - final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - varCharVector.setNull(i); - } else { - varCharVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]); - } - } + } + } + break; + case CHAR: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, charNullSetter, charValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + charNullSetter.accept(i, arrowVector, hiveVector); + } else { + charValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case DATE: - { - final DateDayVector dateDayVector = (DateDayVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - dateDayVector.setNull(i); - } else { - dateDayVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case STRING: + case VARCHAR: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, stringNullSetter, stringValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + stringNullSetter.accept(i, arrowVector, hiveVector); + } else { + stringValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case TIMESTAMP: - { - final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector; - final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - timeStampMicroTZVector.setNull(i); - } else { - // Time = second + sub-second - final long secondInMillis = timestampColumnVector.getTime(i); - final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS; - final long subSecondInMicros = timestampColumnVector.getNanos(i) / NS_PER_MICROS; - - if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) { - // If the timestamp cannot be represented in long microsecond, set it as a null value - timeStampMicroTZVector.setNull(i); - } else { - timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros); - } - } - } + } + } + break; + case DATE: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, dateNullSetter, dateValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + dateNullSetter.accept(i, arrowVector, hiveVector); + } else { + dateValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case BINARY: - { - final VarBinaryVector varBinaryVector = (VarBinaryVector) arrowVector; - final BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - varBinaryVector.setNull(i); - } else { - varBinaryVector.setSafe(i, bytesVector.vector[i], bytesVector.start[i], bytesVector.length[i]); - } - } + } + } + break; + case TIMESTAMP: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, timestampNullSetter, timestampValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + timestampNullSetter.accept(i, arrowVector, hiveVector); + } else { + timestampValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case DECIMAL: - { - final DecimalVector decimalVector = (DecimalVector) arrowVector; - final int scale = decimalVector.getScale(); - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - decimalVector.setNull(i); - } else { - decimalVector.set(i, - ((DecimalColumnVector) hiveVector).vector[i].getHiveDecimal().bigDecimalValue().setScale(scale)); - } - } + } + } + break; + case BINARY: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, binaryNullSetter, binaryValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + binaryNullSetter.accept(i, arrowVector, hiveVector); + } else { + binaryValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); } - break; - case INTERVAL_YEAR_MONTH: - { - final IntervalYearVector intervalYearVector = (IntervalYearVector) arrowVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - intervalYearVector.setNull(i); - } else { - intervalYearVector.set(i, (int) ((LongColumnVector) hiveVector).vector[i]); - } - } + } + } + break; + case DECIMAL: + { + if(isNative) { + if(hiveVector instanceof DecimalColumnVector) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, decimalNullSetter, decimalValueSetter, typeInfo); + } else { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, decimalNullSetter, decimal64ValueSetter, typeInfo); } - break; - case INTERVAL_DAY_TIME: - { - final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector; - final IntervalDayTimeColumnVector intervalDayTimeColumnVector = - (IntervalDayTimeColumnVector) hiveVector; - for (int i = 0; i < size; i++) { - if (hiveVector.isNull[i]) { - intervalDayVector.setNull(i); - } else { - final long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(i); - final long days = totalSeconds / SECOND_PER_DAY; - final long millis = - (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND + - intervalDayTimeColumnVector.getNanos(i) / NS_PER_MILLIS; - intervalDayVector.set(i, (int) days, (int) millis); - } - } + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + decimalNullSetter.accept(i, arrowVector, hiveVector); + } else if(hiveVector instanceof DecimalColumnVector) { + decimalValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); + } else if(hiveVector instanceof Decimal64ColumnVector) { + decimal64ValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); + } else { + throw new IllegalArgumentException("Unsupported vector column type: " + hiveVector.getClass().getName()); } - break; - case VOID: - case UNKNOWN: - case TIMESTAMPLOCALTZ: - default: - throw new IllegalArgumentException(); + } + } + break; + case INTERVAL_YEAR_MONTH: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intervalYearMonthNullSetter, intervalYearMonthValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + intervalYearMonthNullSetter.accept(i, arrowVector, hiveVector); + } else { + intervalYearMonthValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); + } + } + } + break; + case INTERVAL_DAY_TIME: + { + if(isNative) { + writeGeneric(arrowVector, hiveVector, size, vectorizedRowBatch.selectedInUse, vectorizedRowBatch.selected, intervalDayTimeNullSetter, intervalDayTimeValueSetter, typeInfo); + return; + } + for (int i = 0; i < size; i++) { + if (hiveVector.isNull[i]) { + intervalDayTimeNullSetter.accept(i, arrowVector, hiveVector); + } else { + intervalDayTimeValueSetter.accept(i, i, arrowVector, hiveVector, typeInfo); + } + } + } + break; + case VOID: + case UNKNOWN: + case TIMESTAMPLOCALTZ: + default: + throw new IllegalArgumentException(); } } @@ -540,7 +716,7 @@ class Serializer { // if row is null, it means there are no more rows (closeOp()). // another case can be that the buffer is full. if (obj == null) { - return serializeBatch(); + return serializeBatch(vectorizedRowBatch, false); } List<Object> standardObjects = new ArrayList<Object>(); ObjectInspectorUtils.copyToStandardObject(standardObjects, obj, @@ -549,8 +725,241 @@ class Serializer { vectorAssignRow.assignRow(vectorizedRowBatch, batchSize, standardObjects, fieldSize); batchSize++; if (batchSize == MAX_BUFFERED_ROWS) { - return serializeBatch(); + return serializeBatch(vectorizedRowBatch, false); } return null; } + + //Use a provided nullSetter and valueSetter function to populate + //fieldVector from hiveVector + private static void writeGeneric(final FieldVector fieldVector, final ColumnVector hiveVector, final int size, final boolean selectedInUse, final int[] selected, final IntAndVectorsConsumer nullSetter, final IntIntAndVectorsConsumer valueSetter, TypeInfo typeInfo) + { + final boolean[] inputIsNull = hiveVector.isNull; + final int[] sel = selected; + + if (hiveVector.isRepeating) { + if (hiveVector.noNulls || !inputIsNull[0]) { + for(int i = 0; i < size; i++) { + //Fill n rows with value in row 0 + valueSetter.accept(i, 0, fieldVector, hiveVector, typeInfo); + } + } else { + for(int i = 0; i < size; i++) { + //Fill n rows with NULL + nullSetter.accept(i, fieldVector, hiveVector); + } + } + return; + } + + if (hiveVector.noNulls) { + if (selectedInUse) { + for(int logical = 0; logical < size; logical++) { + final int batchIndex = sel[logical]; + //Add row batchIndex + valueSetter.accept(logical, batchIndex, fieldVector, hiveVector, typeInfo); + } + } else { + for(int batchIndex = 0; batchIndex < size; batchIndex++) { + //Add row batchIndex + valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector, typeInfo); + } + } + } else { + if (selectedInUse) { + for(int logical = 0; logical < size; logical++) { + final int batchIndex = sel[logical]; + if (inputIsNull[batchIndex]) { + //Add NULL + nullSetter.accept(batchIndex, fieldVector, hiveVector); + } else { + //Add row batchIndex + valueSetter.accept(logical, batchIndex, fieldVector, hiveVector, typeInfo); + } + } + } else { + for(int batchIndex = 0; batchIndex < size; batchIndex++) { + if (inputIsNull[batchIndex]) { + //Add NULL + nullSetter.accept(batchIndex, fieldVector, hiveVector); + } else { + //Add row batchIndex + valueSetter.accept(batchIndex, batchIndex, fieldVector, hiveVector, typeInfo); + } + } + } + } + } + + //nullSetters and valueSetter for each type + + //bool + private static final IntAndVectorsConsumer boolNullSetter = (i, arrowVector, hiveVector) + -> ((BitVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer boolValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((BitVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + //byte + private static final IntAndVectorsConsumer byteNullSetter = (i, arrowVector, hiveVector) + -> ((TinyIntVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer byteValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((TinyIntVector) arrowVector).set(i, (byte) ((LongColumnVector) hiveVector).vector[j]); + + //short + private static final IntAndVectorsConsumer shortNullSetter = (i, arrowVector, hiveVector) + -> ((SmallIntVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer shortValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((SmallIntVector) arrowVector).set(i, (short) ((LongColumnVector) hiveVector).vector[j]); + + //int + private static final IntAndVectorsConsumer intNullSetter = (i, arrowVector, hiveVector) + -> ((IntVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer intValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((IntVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + //long + private static final IntAndVectorsConsumer longNullSetter = (i, arrowVector, hiveVector) + -> ((BigIntVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer longValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((BigIntVector) arrowVector).set(i, ((LongColumnVector) hiveVector).vector[j]); + + //float + private static final IntAndVectorsConsumer floatNullSetter = (i, arrowVector, hiveVector) + -> ((Float4Vector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer floatValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((Float4Vector) arrowVector).set(i, (float) ((DoubleColumnVector) hiveVector).vector[j]); + + //double + private static final IntAndVectorsConsumer doubleNullSetter = (i, arrowVector, hiveVector) + -> ((Float8Vector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer doubleValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((Float8Vector) arrowVector).set(i, ((DoubleColumnVector) hiveVector).vector[j]); + + //string/varchar + private static final IntAndVectorsConsumer stringNullSetter = (i, arrowVector, hiveVector) + -> ((VarCharVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer stringValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> { + BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; + ((VarCharVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]); + }; + + //fixed-length CHAR + private static final IntAndVectorsConsumer charNullSetter = (i, arrowVector, hiveVector) + -> ((VarCharVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer charValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> { + BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; + VarCharVector varCharVector = (VarCharVector) arrowVector; + byte[] bytes = bytesVector.vector[j]; + int length = bytesVector.length[j]; + int start = bytesVector.start[j]; + + if (bytes == null) { + bytes = EMPTY_BYTES; + start = 0; + length = 0; + } + + final CharTypeInfo charTypeInfo = (CharTypeInfo) typeInfo; + final int paddedLength = charTypeInfo.getLength(); + final byte[] paddedBytes = StringExpr.padRight(bytes, start, length, paddedLength); + varCharVector.setSafe(i, paddedBytes, 0, paddedBytes.length); + }; + + //date + private static final IntAndVectorsConsumer dateNullSetter = (i, arrowVector, hiveVector) + -> ((DateDayVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer dateValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((DateDayVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + //timestamp + private static final IntAndVectorsConsumer timestampNullSetter = (i, arrowVector, hiveVector) + -> ((TimeStampMicroTZVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer timestampValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> { + final TimeStampMicroTZVector timeStampMicroTZVector = (TimeStampMicroTZVector) arrowVector; + final TimestampColumnVector timestampColumnVector = (TimestampColumnVector) hiveVector; + // Time = second + sub-second + final long secondInMillis = timestampColumnVector.getTime(j); + final long secondInMicros = (secondInMillis - secondInMillis % MILLIS_PER_SECOND) * MICROS_PER_MILLIS; + final long subSecondInMicros = timestampColumnVector.getNanos(j) / NS_PER_MICROS; + if ((secondInMillis > 0 && secondInMicros < 0) || (secondInMillis < 0 && secondInMicros > 0)) { + // If the timestamp cannot be represented in long microsecond, set it as a null value + timeStampMicroTZVector.setNull(i); + } else { + timeStampMicroTZVector.set(i, secondInMicros + subSecondInMicros); + } + }; + + //binary + private static final IntAndVectorsConsumer binaryNullSetter = (i, arrowVector, hiveVector) + -> ((VarBinaryVector) arrowVector).setNull(i); + private static final IntIntAndVectorsConsumer binaryValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> { + BytesColumnVector bytesVector = (BytesColumnVector) hiveVector; + ((VarBinaryVector) arrowVector).setSafe(i, bytesVector.vector[j], bytesVector.start[j], bytesVector.length[j]); + }; + + //decimal and decimal64 + private static final IntAndVectorsConsumer decimalNullSetter = (i, arrowVector, hiveVector) + -> ((DecimalVector) arrowVector).setNull(i); + private final IntIntAndVectorsConsumer decimalValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> { + final DecimalVector decimalVector = (DecimalVector) arrowVector; + final int scale = decimalVector.getScale(); + decimalVector.set(i, ((DecimalColumnVector) hiveVector).vector[j].getHiveDecimal().bigDecimalValue().setScale(scale)); + + final HiveDecimalWritable writable = ((DecimalColumnVector) hiveVector).vector[i]; + decimalHolder.precision = writable.precision(); + decimalHolder.scale = scale; + try (ArrowBuf arrowBuf = allocator.buffer(DecimalHolder.WIDTH)) { + decimalHolder.buffer = arrowBuf; + final BigInteger bigInteger = new BigInteger(writable.getInternalStorage()). + multiply(BigInteger.TEN.pow(scale - writable.scale())); + decimalVector.set(i, new BigDecimal(bigInteger, scale)); + } + }; + private static final IntIntAndVectorsConsumer decimal64ValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> { + final DecimalVector decimalVector = (DecimalVector) arrowVector; + final int scale = decimalVector.getScale(); + HiveDecimalWritable decimalHolder = new HiveDecimalWritable(); + decimalHolder.setFromLongAndScale(((Decimal64ColumnVector) hiveVector).vector[j], scale); + decimalVector.set(i, decimalHolder.getHiveDecimal().bigDecimalValue().setScale(scale)); + }; + + //interval year + private static final IntAndVectorsConsumer intervalYearMonthNullSetter = (i, arrowVector, hiveVector) + -> ((IntervalYearVector) arrowVector).setNull(i); + private static IntIntAndVectorsConsumer intervalYearMonthValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> ((IntervalYearVector) arrowVector).set(i, (int) ((LongColumnVector) hiveVector).vector[j]); + + //interval day + private static final IntAndVectorsConsumer intervalDayTimeNullSetter = (i, arrowVector, hiveVector) + -> ((IntervalDayVector) arrowVector).setNull(i); + private static IntIntAndVectorsConsumer intervalDayTimeValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + -> { + final IntervalDayVector intervalDayVector = (IntervalDayVector) arrowVector; + final IntervalDayTimeColumnVector intervalDayTimeColumnVector = + (IntervalDayTimeColumnVector) hiveVector; + long totalSeconds = intervalDayTimeColumnVector.getTotalSeconds(j); + final long days = totalSeconds / SECOND_PER_DAY; + final long millis = + (totalSeconds - days * SECOND_PER_DAY) * MILLIS_PER_SECOND + + intervalDayTimeColumnVector.getNanos(j) / NS_PER_MILLIS; + intervalDayVector.set(i, (int) days, (int) millis); + }; + + //Used for setting null at arrowVector[i] + private interface IntAndVectorsConsumer { + void accept(int i, FieldVector arrowVector, ColumnVector hiveVector); + } + + //Used to copy value from hiveVector[j] -> arrowVector[i] + //since hiveVector might be referenced through vector.selected + private interface IntIntAndVectorsConsumer { + void accept(int i, int j, FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo); + } + } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 1946cecc084..2dd12ef1918 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.filesink.VectorFileSinkArrowOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyStringOperator; @@ -4120,6 +4121,48 @@ public class Vectorizer implements PhysicalPlanResolver { return true; } + private boolean checkForArrowFileSink(FileSinkDesc fileSinkDesc, + boolean isTezOrSpark, VectorizationContext vContext, + VectorFileSinkDesc vectorDesc) throws HiveException { + + // Various restrictions. + + boolean isVectorizationFileSinkArrowNativeEnabled = + HiveConf.getBoolVar(hiveConf, + HiveConf.ConfVars.HIVE_VECTORIZATION_FILESINK_ARROW_NATIVE_ENABLED); + + String engine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + + String serdeClassName = fileSinkDesc.getTableInfo().getSerdeClassName(); + + boolean isOkArrowFileSink = + serdeClassName.equals("org.apache.hadoop.hive.ql.io.arrow.ArrowColumnarBatchSerDe") && + isVectorizationFileSinkArrowNativeEnabled && + engine.equalsIgnoreCase("tez"); + + return isOkArrowFileSink; + } + + private Operator<? extends OperatorDesc> specializeArrowFileSinkOperator( + Operator<? extends OperatorDesc> op, VectorizationContext vContext, FileSinkDesc desc, + VectorFileSinkDesc vectorDesc) throws HiveException { + + Class<? extends Operator<?>> opClass = VectorFileSinkArrowOperator.class; + + Operator<? extends OperatorDesc> vectorOp = null; + try { + vectorOp = OperatorFactory.getVectorOperator( + opClass, op.getCompilationOpContext(), op.getConf(), + vContext, vectorDesc); + } catch (Exception e) { + LOG.info("Vectorizer vectorizeOperator file sink class exception " + opClass.getSimpleName() + + " exception " + e); + throw new HiveException(e); + } + + return vectorOp; + } + private boolean usesVectorUDFAdaptor(VectorExpression vecExpr) { if (vecExpr == null) { return false; @@ -5130,9 +5173,20 @@ public class Vectorizer implements PhysicalPlanResolver { FileSinkDesc fileSinkDesc = (FileSinkDesc) op.getConf(); VectorFileSinkDesc vectorFileSinkDesc = new VectorFileSinkDesc(); - vectorOp = OperatorFactory.getVectorOperator( - op.getCompilationOpContext(), fileSinkDesc, vContext, vectorFileSinkDesc); - isNative = false; + boolean isArrowSpecialization = + checkForArrowFileSink(fileSinkDesc, isTezOrSpark, vContext, vectorFileSinkDesc); + + if (isArrowSpecialization) { + vectorOp = + specializeArrowFileSinkOperator( + op, vContext, fileSinkDesc, vectorFileSinkDesc); + isNative = true; + } else { + vectorOp = + OperatorFactory.getVectorOperator( + op.getCompilationOpContext(), fileSinkDesc, vContext, vectorFileSinkDesc); + isNative = false; + } } break; case LIMIT: diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java index 10484ac9252..bf20aae5195 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.arrow; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.arrow.vector.VarCharVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -102,6 +103,7 @@ public class TestArrowColumnarBatchSerDe { {text(""), charW("", 10), varcharW("", 10)}, {text("Hello"), charW("Hello", 10), varcharW("Hello", 10)}, {text("world!"), charW("world!", 10), varcharW("world!", 10)}, + {text("안녕?"), charW("안녕?", 10), varcharW("안녕?", 10)}, {null, null, null}, }; @@ -525,6 +527,31 @@ public class TestArrowColumnarBatchSerDe { initAndSerializeAndDeserialize(schema, DECIMAL_ROWS); } + @Test + public void testRandomPrimitiveDecimal() throws SerDeException { + String[][] schema = { + {"decimal1", "decimal(38,10)"}, + }; + + int size = 1000; + Object[][] randomDecimals = new Object[size][]; + Random random = new Random(); + for (int i = 0; i < size; i++) { + StringBuilder builder = new StringBuilder(); + builder.append(random.nextBoolean() ? '+' : '-'); + for (int j = 0; j < 28 ; j++) { + builder.append(random.nextInt(10)); + } + builder.append('.'); + for (int j = 0; j < 10; j++) { + builder.append(random.nextInt(10)); + } + randomDecimals[i] = new Object[] {decimalW(HiveDecimal.create(builder.toString()))}; + } + + initAndSerializeAndDeserialize(schema, randomDecimals); + } + @Test public void testPrimitiveBoolean() throws SerDeException { String[][] schema = { @@ -768,6 +795,32 @@ public class TestArrowColumnarBatchSerDe { initAndSerializeAndDeserialize(schema, toMap(BINARY_ROWS)); } + @Test + public void testPrimitiveCharPadding() throws SerDeException { + String[][] schema = { + {"char1", "char(10)"}, + }; + + HiveCharWritable[][] rows = new HiveCharWritable[][] { + {charW("Hello", 10)}, {charW("world!", 10)}}; + ArrowColumnarBatchSerDe serDe = new ArrowColumnarBatchSerDe(); + StructObjectInspector rowOI = initSerDe(serDe, schema); + + ArrowWrapperWritable serialized = null; + for (Object[] row : rows) { + serialized = serDe.serialize(row, rowOI); + } + // Pass null to complete a batch + if (serialized == null) { + serialized = serDe.serialize(null, rowOI); + } + + VarCharVector varCharVector = (VarCharVector) serialized.getVectorSchemaRoot().getFieldVectors().get(0); + for (int i = 0; i < rows.length; i++) { + assertEquals(rows[i][0].getPaddedValue().toString(), new String(varCharVector.get(i))); + } + } + public void testMapDecimal() throws SerDeException { String[][] schema = { {"decimal_map", "map<string,decimal(38,10)>"}, diff --git a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java index 162e8e6353a..6ae2969171e 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java +++ b/storage-api/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/StringExpr.java @@ -129,6 +129,21 @@ public class StringExpr { return charCount; } + public static byte[] padRight(byte[] bytes, int start, int length, int maxCharacterLength) { + final byte[] resultBytes; + final int characterLength = StringExpr.characterCount(bytes, start, length); + final int blankPadLength = Math.max(maxCharacterLength - characterLength, 0); + final int resultLength = length + blankPadLength; + resultBytes = new byte[resultLength]; + final int resultStart = 0; + System.arraycopy(bytes, start, resultBytes, resultStart, length); + final int padEnd = resultStart + resultLength; + for (int p = resultStart + length; p < padEnd; p++) { + resultBytes[p] = ' '; + } + return resultBytes; + } + // A setVal with the same function signature as rightTrim, leftTrim, truncate, etc, below. // Useful for class generation via templates. public static void assign(BytesColumnVector outV, int i, byte[] bytes, int start, int length) {