This is an automated email from the ASF dual-hosted git repository. jdere pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 46ed5c9 HIVE-20312: Allow arrow clients to use their own BufferAllocator with LlapOutputFormatService (Eric Wohlstadter, reviewed by Teddy Choi) 46ed5c9 is described below commit 46ed5c9064eb1ff14c18cdbffe8f8338928505c5 Author: Eric Wohlstadter <wohls...@gmail.com> AuthorDate: Mon Feb 10 12:35:23 2020 -0800 HIVE-20312: Allow arrow clients to use their own BufferAllocator with LlapOutputFormatService (Eric Wohlstadter, reviewed by Teddy Choi) --- .../hive/llap/LlapArrowBatchRecordReader.java | 15 +++++++++++-- .../hadoop/hive/llap/LlapArrowRowInputFormat.java | 14 +++++++++++- .../hadoop/hive/llap/LlapBaseInputFormat.java | 25 ++++++++++++++++++---- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java index d9c5666..cb3d9cc 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowBatchRecordReader.java @@ -39,13 +39,21 @@ public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrappe private BufferAllocator allocator; private ArrowStreamReader arrowStreamReader; + //Allows client to provide and manage their own arrow BufferAllocator public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz, - JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { + JobConf job, Closeable client, Socket socket, BufferAllocator allocator) throws IOException { super(in, schema, clazz, job, client, socket); - allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit); + this.allocator = allocator; this.arrowStreamReader = new ArrowStreamReader(socket.getInputStream(), allocator); } + //Use the global arrow BufferAllocator + public LlapArrowBatchRecordReader(InputStream in, Schema schema, Class<ArrowWrapperWritable> clazz, + JobConf job, Closeable client, Socket socket, long arrowAllocatorLimit) throws IOException { + this(in, schema, clazz, job, client, socket, + RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit)); + } + @Override public boolean next(NullWritable key, ArrowWrapperWritable value) throws IOException { try { @@ -76,6 +84,9 @@ public class LlapArrowBatchRecordReader extends LlapBaseRecordReader<ArrowWrappe @Override public void close() throws IOException { arrowStreamReader.close(); + //allocator.close() will throw exception unless all buffers have been released + //See org.apache.arrow.memory.BaseAllocator.close() + allocator.close(); } } diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java index fafbdee..46566be 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapArrowRowInputFormat.java @@ -25,16 +25,28 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import java.io.IOException; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.hadoop.hive.ql.io.arrow.RootAllocatorFactory; +import java.util.UUID; /* * Adapts an Arrow batch reader to a row reader + * Only used for testing */ public class LlapArrowRowInputFormat implements InputFormat<NullWritable, Row> { private LlapBaseInputFormat baseInputFormat; public LlapArrowRowInputFormat(long arrowAllocatorLimit) { - baseInputFormat = new LlapBaseInputFormat(true, arrowAllocatorLimit); + BufferAllocator allocator = RootAllocatorFactory.INSTANCE.getOrCreateRootAllocator(arrowAllocatorLimit).newChildAllocator( + //allocator name, use UUID for testing + UUID.randomUUID().toString(), + //No use for reservation, allocators claim memory from the same pool, + //but allocate/releases are tracked per-allocator + 0, + //Limit passed in by client + arrowAllocatorLimit); + baseInputFormat = new LlapBaseInputFormat(true, allocator); } @Override diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java index 5c99655..6bf7f33 100644 --- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java +++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java @@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; +import org.apache.arrow.memory.BufferAllocator; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.security.Credentials; @@ -107,6 +108,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> private String query; private boolean useArrow; private long arrowAllocatorLimit; + private BufferAllocator allocator; private final Random rand = new Random(); public static final String URL_KEY = "llap.if.hs2.connection"; @@ -129,11 +131,17 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> this.query = query; } + //Exposed only for testing, clients should use LlapBaseInputFormat(boolean, BufferAllocator instead) public LlapBaseInputFormat(boolean useArrow, long arrowAllocatorLimit) { this.useArrow = useArrow; this.arrowAllocatorLimit = arrowAllocatorLimit; } + public LlapBaseInputFormat(boolean useArrow, BufferAllocator allocator) { + this.useArrow = useArrow; + this.allocator = allocator; + } + public LlapBaseInputFormat() { this.useArrow = false; } @@ -214,10 +222,19 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>> @SuppressWarnings("rawtypes") LlapBaseRecordReader recordReader; if(useArrow) { - recordReader = new LlapArrowBatchRecordReader( - socket.getInputStream(), llapSplit.getSchema(), - ArrowWrapperWritable.class, job, llapClient, socket, - arrowAllocatorLimit); + if(allocator != null) { + //Client provided their own allocator + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + allocator); + } else { + //Client did not provide their own allocator, use constructor for global allocator + recordReader = new LlapArrowBatchRecordReader( + socket.getInputStream(), llapSplit.getSchema(), + ArrowWrapperWritable.class, job, llapClient, socket, + arrowAllocatorLimit); + } } else { recordReader = new LlapBaseRecordReader(socket.getInputStream(), llapSplit.getSchema(), BytesWritable.class, job, llapClient, (java.io.Closeable)socket);